Geeks of Coding

Join us on Telegram

Viewing 0 reply threads
  • Author
    • #1172
      Abhishek TyagiAbhishek Tyagi

      Welcome to the final project of “Apache Spark for Scalable Machine Learning on BigData”. In this assignment you’ll analyze a real-world dataset and apply machine learning on it using Apache Spark.

      In order to pass, you need to implement some code (basically replace the parts marked with $$) and finally answer a quiz on the Coursera platform.

      Let’s start by downloading the dataset and creating a dataframe. This dataset can be found on DAX, the IBM Data Asset Exchange and can be downloaded for free.

      # delete files from previous runs
      !rm -f jfk_weather*
      # download the file containing the data in CSV format
      # extract the data
      !tar xvfz jfk_weather.tar.gz
      # create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents
      df ="header", "true").option("inferSchema","true").csv('jfk_weather.csv')
      # register a corresponding query table

      The dataset contains some null values, therefore schema inference didn’t work properly for all columns, in addition, a column contained trailing characters, so we need to clean up the data set first. This is a normal task in any data science project since your data is never clean, don’t worry if you don’t understand all code, you won’t be asked about it.

      import random
      from pyspark.sql.functions import translate, col
      df_cleaned = df \
          .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
          .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
          .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
          .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
          .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
          .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \
      df_cleaned =   df_cleaned \
                          .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                          .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                          .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                          .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \
      df_filtered = df_cleaned.filter("""
          HOURLYWindSpeed <> 0
          and HOURLYWindDirection <> 0
          and HOURLYStationPressure <> 0
          and HOURLYPressureTendency <> 0
          and HOURLYPressureTendency <> 0
          and HOURLYPrecip <> 0
          and HOURLYRelativeHumidity <> 0
          and HOURLYDRYBULBTEMPC <> 0


      We want to predict the value of one column based of some others. It is sometimes helpful to print a correlation matrix.

      from import VectorAssembler
      vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYWindDirection","HOURLYStationPressure","HOURLYPressureTendency"],
      df_pipeline = vectorAssembler.transform(df_filtered)
      from import Correlation

      As we can see, HOURLYWindSpeed and HOURLYWindDirection correlate with 0.06306013 whereas HOURLYWindSpeed and HOURLYStationPressure correlate with -0.4204518, this is a good sign if we want to predict HOURLYWindSpeed from HOURLYWindDirection and HOURLYStationPressure. Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

      splits = df_filtered.randomSplit([0.8, 0.2])
      df_train = splits[0]
      df_test = splits[1]

      Again, we can re-use our feature engineering pipeline

      from import StringIndexer, OneHotEncoder
      from import Vectors
      from import VectorAssembler
      from import Normalizer
      from import Pipeline
      vectorAssembler = VectorAssembler(inputCols=[
      normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

      Now we define a function for evaluating our regression prediction performance. We’re using RMSE (Root Mean Squared Error) here , the smaller the better…

      def regression_metrics(prediction):
          from import RegressionEvaluator
          evaluator = RegressionEvaluator(
          labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
          rmse = evaluator.evaluate(prediction)
          print("RMSE on test data = %g" % rmse)

      Let’s run a linear regression model first for building a baseline.

      from import LinearRegression
      lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0)
      pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
      model =
      prediction = model.transform(df_test)

      Now we’ll try a Gradient Boosted Tree Regressor

      from import GBTRegressor
      gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
      pipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])
      model =
      prediction = model.transform(df_test)

      Now let’s switch gears. Previously, we tried to predict HOURLYWindSpeed, but now we predict HOURLYWindDirection. In order to turn this into a classification problem we discretize the value using the Bucketizer. The new feature is called HOURLYWindDirectionBucketized.

      from import Bucketizer, OneHotEncoder
      bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
      encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")
      def classification_metrics(prediction):
          from import MulticlassClassificationEvaluator
          mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
          accuracy = mcEval.evaluate(prediction)
          print("Accuracy on test data = %g" % accuracy)

      Again, for baselining we use LogisticRegression.

      from import LogisticRegression
      lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=30)
      vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
      pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])
      model =
      prediction = model.transform(df_test)

      Let’s try some other Algorithms and see if model performance increases. It’s also important to tweak other parameters like parameters of individual algorithms (e.g. number of trees for RandomForest) or parameters in the feature engineering pipeline, e.g. train/test split ratio, normalization, bucketing, …

      from import RandomForestClassifier
      rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=10)
      vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
      pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
      model =
      prediction = model.transform(df_test)
      from import GBTClassifier
      gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)
      vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
      pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
      model =
      prediction = model.transform(df_test)
Viewing 0 reply threads
  • You must be logged in to reply to this topic.