PySpark Makina Öğrenmesi (PySpark ML Classification Preapering)

 

PySpark Makine Öğrenmesi

 

PySpark Makina Öğrenmesi (PySpark ML Classification)

Merhaba, PySpark yazılarına devam ediyoruz.  Bu yazıda pyspark kullanarak ML modeli geliştireceğiz. Bu yazıya geçmeden önce bir önceki yazıyı yukarıdan okuyabilirsiniz.  Hemen başlayalım.

Modeli oluşturmadan önce, veri setini daha düzenli bir hale getirmemiz gerekiyor (Preprocessing).  Bir önceki yazıda indirdiğimiz veri setini kullanacağız.

Gerekli olmayan kolonları sileceğiz ve boş (Null veya NaN) değerler var ise onları da veri setinden çıkaracağız.

from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()
# Read data from CSV file,
flights = spark.read.csv('flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()

Column manipulation

Federal Havacılık İdaresi (FAA), bir uçuşun planlanan saatinden 15 dakika veya daha uzun bir süre sonra varması durumunda “gecikmeli” olduğunu düşünmektedir. Bu durumda veri setimizde düzenlememiz gereken yerler oluyor. 

  • Mil olarak verilen kolonu gerekli katsayı ile çarparak km cinsine dönüştürmeliyiz
  • Delay kolonunu boolean ‘a dönüştürmeliyiz.

 

# Import the required function

from pyspark.sql.functions import round
# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_km.show(5)

Uçuş verilerinde, kategorik verileri tutan iki sütun vardır; carrier ve org. Bu sütunları dizine alınmış sayısal değerlere dönüştürmeniz gerekir.

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

 

Veri setimizi hazırlamamızın son aşamasında , tüm tahmin sütunlarını tek bir sütunda birleştiriyoruz.

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

Bir sonraki yazı da ML modelini gerçekleştireceğiz. Görüşmek üzere.

About Deniz Parlak

Hi, i’m Security Data Scientist & Data Engineer at My Security Analytics. I have experienced Advance Python, Machine Learning and Big Data tools. Also i worked Oracle Database Administration, Migration and upgrade projects. For your questions [email protected]

Leave a Reply

Your email address will not be published. Required fields are marked *