Project: Apriori Algorithm for Finding Frequent Itemsets with PySpark

Task 1: Import the Libraries and Set Up the Environment

import itertools
import findspark
findspark.init()
import pyspark
import os
import findspark

# Set environment variables within the notebook
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home'  # Verify this path
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark'  # Verify this path
findspark.init()

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("example").getOrCreate()
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
---------------------------------------------------------------------------
PySparkRuntimeError                       Traceback (most recent call last)
Cell In[5], line 12
      9 from pyspark.sql import SparkSession
     11 # Initialize Spark Session
---> 12 spark = SparkSession.builder.appName("example").getOrCreate()

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self)
    495     sparkConf.set(key, value)
    496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
    498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    499 # by all sessions.
    500 session = SparkSession(sc, options=self._options)

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
    513 with SparkContext._lock:
    514     if SparkContext._active_spark_context is None:
--> 515         SparkContext(conf=conf or SparkConf())
    516     assert SparkContext._active_spark_context is not None
    517     return SparkContext._active_spark_context

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
spark.getOrCreate()
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
---------------------------------------------------------------------------
PySparkRuntimeError                       Traceback (most recent call last)
Cell In[4], line 1
----> 1 spark.getOrCreate()

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self)
    495     sparkConf.set(key, value)
    496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
    498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    499 # by all sessions.
    500 session = SparkSession(sc, options=self._options)

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
    513 with SparkContext._lock:
    514     if SparkContext._active_spark_context is None:
--> 515         SparkContext(conf=conf or SparkConf())
    516     assert SparkContext._active_spark_context is not None
    517     return SparkContext._active_spark_context

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Value"])

df.show()
conf = pyspark.SparkConf()
conf.setAppName('apriori')
conf.setMaster('local')
context = pyspark.SparkContext(conf=conf)
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
---------------------------------------------------------------------------
PySparkRuntimeError                       Traceback (most recent call last)
Cell In[10], line 4
      2 conf.setAppName('apriori')
      3 conf.setMaster('local')
----> 4 context = pyspark.SparkContext(conf=conf)

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
from pyspark import SparkContext
SparkContext.getOrCreate().stop()
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
---------------------------------------------------------------------------
PySparkRuntimeError                       Traceback (most recent call last)
Cell In[11], line 2
      1 from pyspark import SparkContext
----> 2 SparkContext.getOrCreate().stop()

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
    513 with SparkContext._lock:
    514     if SparkContext._active_spark_context is None:
--> 515         SparkContext(conf=conf or SparkConf())
    516     assert SparkContext._active_spark_context is not None
    517     return SparkContext._active_spark_context

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    196     raise ValueError(
    197         "You are trying to pass an insecure Py4j gateway to Spark. This"
    198         " is not allowed as it is a security risk."
    199     )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    202 try:
    203     self._do_init(
    204         master,
    205         appName,
   (...)
    215         memory_profiler_cls,
    216     )

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    434 with SparkContext._lock:
    435     if not SparkContext._gateway:
--> 436         SparkContext._gateway = gateway or launch_gateway(conf)
    437         SparkContext._jvm = SparkContext._gateway.jvm
    439     if instance:

File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
    104     time.sleep(0.1)
    106 if not os.path.isfile(conn_info_file):
--> 107     raise PySparkRuntimeError(
    108         error_class="JAVA_GATEWAY_EXITED",
    109         message_parameters={},
    110     )
    112 with open(conn_info_file, "rb") as info:
    113     gateway_port = read_int(info)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
session = pyspark.sql.SparkSession(context)
conf.getAll()

Task 2: Generate Combinations—Parent Intersection Property

def pre_check(freq_k_1, k):
    k_size_comb = []
    for i in range(len(freq_k_1)):
        x = set(freq_k_1[i])
        for j in range(len(freq_k_1)):
            y = set(freq_k_1[j])
            if j<i:
                if len(x.intersection(y)) >= (k-2):
                    k_size_comb.append(tuple(sorted(list(x.union(y)))))
    return k_size_comb

Task 3: Generate Combinations—Subset Frequency Property

def post_check(k_size_comb, freq_k_1, k):
    filtered = []
    for  comb in  k_size_comb:
        flag = False
        for sub_comb in itertools.combinations(comb, k-1):
            if sub_comb not in freq_k_1:
                flag = True
        if flag == False:
            filtered.append(tuple(comb))
    return filtered

Task 4: Count Check

def count_check(filtered, lines, supCount):
    results = []
    counts = dict(zip(filtered, [0]*len(filtered)))
    for combination in filtered:
        present = [False]*len(combination)
        for i in range(len(combination)):
            for line in lines: 
                if combination[i] in line:
                    present[i] = True
                if all(present):
                    counts[combination] +=1

    for word, count in counts.items():
        if (count>=supCount):
            results.append(word)
    return results

Task 5: Generate k-Size Combinations

def generator(freq_k_1, k, partition, support):
    
    lines = list(partition)
    supCount = len(lines)*support

    k_size_comb = pre_check(freq_k_1, k)
    
    filtered = post_check(k_size_comb, freq_k_1, k)
    
    return count_check(filtered, lines, supCount)

Task 6: Generate Singles

def get_singles(lines, support):
    supCount = len(list(lines))*support
    vocab = set([])
    for line in lines:
        for word in line:
            vocab.add(word)
    counts = dict(zip(vocab, [0]*len(list(vocab))))
    combinations = []
    for line in lines:
        for word in line:
            counts[word] +=1
    for word, count in counts.items():
        if (count>=supCount):
            combinations.append(tuple((word,))) 
    return sorted(combinations)

Task 7: The Worker Partition Mapper

seq_len = context.broadcast(2)
def apriori(iterator):
    partition = []
    for v in iterator:
        partition.append(v)
    support = sup.value
    results= get_singles(partition, support)
    print('starting with', results)

    for k in range(2, seq_len.value+1):
        print('sequence length', k)
     
        combos = generator(results, k, partition, support)

        if len(combos) == 0:
            print('ending at sequence length' ,k-1)
            return results

        results = combos
    return results

Task 8: Load Data and Preprocess

rdd = context.textFile("usercode/Dataset.csv")
tagsheader = rdd.first() 
tags = context.parallelize(tagsheader)
seq_len = context.broadcast(3)
data = rdd.subtract(tags)
length = context.broadcast(data.count())
sup = context.broadcast(0.03)
lines = data.map(lambda x: x.lstrip('"').rstrip('"').split(','))

Task 9: The Distributed Transform

freq = lines.mapPartitions(apriori)
freq = freq.distinct()
comb = freq.collect()
print("Possible frequent itemset(s):\n", comb)

Task 10: Auxiliary Function to Check Presence

def auxiliary(row, combinations):
    present= []
    for combination in combinations:
        presence = [False]*len(combination)
        for i in range(len(combination)):
            presence[i] = combination[i] in row
        if all(presence):
            present+=[combination]
    return present

Task 11: Count Check at Master

comb = context.broadcast(comb)
freq1 = lines.map(lambda x: [(key, 1) for key in auxiliary(x, comb.value)]).filter(lambda x: len(x)>0)

freq2 = freq1.flatMap(lambda x: x)
freq3 = freq2.reduceByKey(lambda x, y: x+y)
freq4 = freq3.filter(lambda x: x[1]>sup.value*length.value).map(lambda x: x[0])
freq4.collect()

End

import os

# Set JAVA_HOME and SPARK_HOME
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home'
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark' 
os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin:' + os.environ['SPARK_HOME'] + '/bin:' + os.environ['PATH']
# Verify the environment variables are set
print(os.environ['JAVA_HOME'])
print(os.environ['SPARK_HOME'])
print(os.environ['PATH'])
/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home
/opt/homebrew/opt/apache-spark
/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home/bin:/opt/homebrew/opt/apache-spark/bin:/Users/nenadbozinovic/Documents/blog/venv_blog/bin:/Users/nenadbozinovic/.pyenv/shims:/Users/nenadbozinovic/.pyenv/bin:/Library/Frameworks/Python.framework/Versions/3.11/bin:/Library/Frameworks/Python.framework/Versions/3.12/bin:/Library/Frameworks/Python.framework/Versions/3.9/bin:/Library/Frameworks/Python.framework/Versions/3.10/bin:/opt/homebrew/bin:/opt/homebrew/sbin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/Library/Apple/usr/bin:/Library/TeX/texbin:/Applications/quarto/bin
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example")
spark
<pyspark.sql.session.SparkSession.Builder at 0x126396ba0>
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Value"])

df.show()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[8], line 2
      1 data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
----> 2 df = spark.createDataFrame(data, ["Name", "Value"])
      4 df.show()

AttributeError: 'Builder' object has no attribute 'createDataFrame'