Managing Spark dataframes in Python

2016-12-02

Below a quick sample of using Apache Spark (2.0) dataframes for manipulating data. Sample data is a file of jsonlines like``` {“description”: “255/40 ZR17 94W”, “ean”: “EAN: 4981910401193”, “season”: “tires_season summer”, “price”: “203,98”, “model”: “Michelin Pilot Sport PS2 255/40 R17”, “id”: “MPN: 2351610”} {“description”: “225/55 R17 101V XL”, “ean”: “EAN: 5452000438744”, “season”: “tires_season summer”, “price”: “120,98”, “model”: “Pirelli P Zero 205/45 R17”, “id”: “MPN: 530155”}

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import \*
import re, sys


# warehouse\_location points to the default location for managed databases and tables
warehouse\_location = 'spark-warehouse'

spark = SparkSession \\
    .builder \\
    .appName("Python Spark  ") \\
    .config("spark.sql.warehouse.dir", warehouse\_location) \\
    .enableHiveSupport() \\
    .getOrCreate()

records\_orig = spark.read.json("scraped\_tyres\_data.json")

## removing bad records 
records = records\_orig \\
  .filter(records.id != '') \\
  .filter(regexp\_extract('description', '(rinnovati)', 1) == '')

## saving bad records  
records\_orig.subtract(records).coalesce(1).write.csv("bad-records.csv", sep=";")

# extract new features
regexp\_size = "(\\d+)/(\\d+) R(\\d+) (\\d+)(\\w+)\\s\*"

records = records \\
  .withColumn("width",       regexp\_extract("description", regexp\_size, 1)) \\
  .withColumn("ratio",       regexp\_extract("description", regexp\_size, 2)) \\
  .withColumn("diameter",    regexp\_extract("description", regexp\_size, 3)) \\
  .withColumn("load\_index",  regexp\_extract("description", regexp\_size, 4)) \\
  .withColumn("speed\_index", regexp\_extract("description", regexp\_size, 5)) \\
  .withColumn("brand",       regexp\_extract("model", "^(\\w+) ", 1)) \\
  .withColumn("season",      trim(regexp\_replace("season", "tires\_season",""))) \\
  .withColumn("id",          trim(regexp\_replace("id", "MPN: ",""))) \\
  .withColumn("ean",         trim(regexp\_replace("ean", "EAN: ",""))) \\
  .withColumn("runflat",     regexp\_extract("description", "(runflat)", 1)) \\
  .withColumn("mfs",         regexp\_extract("description", "(MFS|FSL|bordo di protezione|bordino di protezione)", 1)) \\
  .withColumn("xl",          regexp\_extract("description", " (XL|RF)\\s\*", 1)) \\
  .withColumn("chiodabile",  regexp\_extract("description", "(chiodabile)\\s\*", 1))

## extracting and saving all season values
records.select("season").distinct().coalesce(1).write.csv("season\_values", sep=";")

# misc
# records.columns   # show columns
# records.groupBy("brand").count().show()
# records.groupBy("brand").count().filter("count > 100").show(20,False)
#
# renaming all columns before joining dataframes with same column names
# records\_renamed = records.select(\*(col(x).alias(x + '\_renamed') for x in records.columns))
# join two dataframe
# records.join(record\_renamed, records.ean == records\_renamed.ean\_renamed)
#
#
# saving data to several formats
records.coalesce(1).write.csv("result.csv", sep=";")
records.write.json("result.json")
records.write.parquet("result.parquet")
records.write.format("com.databricks.spark.avro").save("result.avro")


```

Enter your instance's address


More posts like this

My Networking Survival Kit

2020-03-15 | #Me

In this small tutorial I’ll speak about tunneling, ssh port forwarding, socks, pac files, Sshuttle I’ve been using Linux since 1995 but I have never been interested a lot in networking.

Continue reading 