PySpark Makina Öğrenmesi (PySpark ML Classification)
Merhaba, PySpark yazılarına devam ediyoruz. Bu yazıda pyspark kullanarak ML modeli geliştireceğiz. Bu yazıya geçmeden önce bir önceki yazıyı yukarıdan okuyabilirsiniz. Hemen başlayalım.
Modeli oluşturmadan önce, veri setini daha düzenli bir hale getirmemiz gerekiyor (Preprocessing). Bir önceki yazıda indirdiğimiz veri setini kullanacağız.
Gerekli olmayan kolonları sileceğiz ve boş (Null veya NaN) değerler var ise onları da veri setinden çıkaracağız.
from pyspark.sql import SparkSession # Create SparkSession object spark = SparkSession.builder \ .master('local[*]') \ .appName('test') \ .getOrCreate() # Read data from CSV file, flights = spark.read.csv('flights-larger.csv', sep=',', header=True, inferSchema=True, nullValue='NA') # Remove the 'flight' column flights_drop_column = flights.drop('flight') # Number of records with missing 'delay' values flights_drop_column.filter('delay IS NULL').count() # Remove records with missing 'delay' values flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL') # Remove records with missing values in any column and get the number of remaining rows flights_none_missing = flights_valid_delay.dropna()
Column manipulation
Federal Havacılık İdaresi (FAA), bir uçuşun planlanan saatinden 15 dakika veya daha uzun bir süre sonra varması durumunda “gecikmeli” olduğunu düşünmektedir. Bu durumda veri setimizde düzenlememiz gereken yerler oluyor.
- Mil olarak verilen kolonu gerekli katsayı ile çarparak km cinsine dönüştürmeliyiz
- Delay kolonunu boolean ‘a dönüştürmeliyiz.
# Import the required function from pyspark.sql.functions import round
# Convert 'mile' to 'km' and drop 'mile' column flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0)) \ .drop('mile') # Create 'label' column indicating whether flight delayed (1) or not (0) flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer')) # Check first five records flights_km.show(5)
Uçuş verilerinde, kategorik verileri tutan iki sütun vardır; carrier ve org. Bu sütunları dizine alınmış sayısal değerlere dönüştürmeniz gerekir.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx') # Indexer identifies categories in the data indexer_model = indexer.fit(flights_km) # Indexer creates a new column with numeric index values flights_indexed = indexer_model.transform(flights_km) # Repeat the process for the other categorical feature flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
Veri setimizi hazırlamamızın son aşamasında , tüm tahmin sütunlarını tek bir sütunda birleştiriyoruz.
# Create an assembler object assembler = VectorAssembler(inputCols=[ 'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration' ], outputCol='features') # Consolidate predictor columns flights_assembled = assembler.transform(flights_indexed) # Check the resulting column flights_assembled.select('features', 'delay').show(5, truncate=False)
Bir sonraki yazı da ML modelini gerçekleştireceğiz. Görüşmek üzere.
One comment
Pingback: PySpark Makina Öğrenmesi (PySpark ML Classification) - IT Tutorial