A while back I wrote about analyzing NYC’s traffic (motor vehicle) data in q/kdb+. Then, soon afterwards, I showed how to analyze that data in python using pandas library. Now, I would like to again analyze the same dataset but this time, in Apache Spark. As I mentioned in my last post, I am currently learning Spark so you will be seeing a lot more posts about it in the near future.
If you don’t have Spark installed, please see my previous post on how to set it up on AWS.
In this post, I will show you how to :
Load data from a csv
Transform dataframe
Aggregating data
Sorting data
Filter data
All types of sex problems especially lack of viagra ordering djpaulkom.tv sexual urge in you. This enzyme is named as canadian pharmacy levitra find this phosphodiesterase type-5. You can use this herbal supplement together with Vital shop cialis you could try this out M-40 capsule to increase energy levels, power, vigor and vitality to last longer and satisfy her fully controlling the PE. This class of natural herbal sexual stimulant pills are commonly referred to as natural herbal commander cialis .
Once we have data loaded into a dataframe, we can do a lot of things with it. First of all, let's take a look at the schema.
We can see that we have a dataframe with many columns and of different data types such as string, double, integer etc.
In [3]:
df.printSchema()
root
|-- DATE: string (nullable = true)
|-- TIME: string (nullable = true)
|-- BOROUGH: string (nullable = true)
|-- ZIP CODE: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- LOCATION: string (nullable = true)
|-- ON STREET NAME: string (nullable = true)
|-- CROSS STREET NAME: string (nullable = true)
|-- OFF STREET NAME: string (nullable = true)
|-- NUMBER OF PERSONS INJURED: string (nullable = true)
|-- NUMBER OF PERSONS KILLED: integer (nullable = true)
|-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
|-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
|-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
|-- NUMBER OF CYCLIST KILLED: string (nullable = true)
|-- NUMBER OF MOTORIST INJURED: string (nullable = true)
|-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
|-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
|-- CONTRIBUTING FACTOR VEHICLE 2: string (nullable = true)
|-- CONTRIBUTING FACTOR VEHICLE 3: string (nullable = true)
|-- CONTRIBUTING FACTOR VEHICLE 4: string (nullable = true)
|-- CONTRIBUTING FACTOR VEHICLE 5: string (nullable = true)
|-- UNIQUE KEY: integer (nullable = true)
|-- VEHICLE TYPE CODE 1: string (nullable = true)
|-- VEHICLE TYPE CODE 2: string (nullable = true)
|-- VEHICLE TYPE CODE 3: string (nullable = true)
|-- VEHICLE TYPE CODE 4: string (nullable = true)
|-- VEHICLE TYPE CODE 5: string (nullable = true)
To make it easy to see the entire dataframe and work with it, we will only select few columns.
In [4]:
df_new=df.select(['DATE','BOROUGH','NUMBER OF PERSONS INJURED','NUMBER OF PERSONS KILLED','NUMBER OF PEDESTRIANS INJURED','NUMBER OF PEDESTRIANS KILLED'])
In [5]:
df_new.printSchema()
root
|-- DATE: string (nullable = true)
|-- BOROUGH: string (nullable = true)
|-- NUMBER OF PERSONS INJURED: string (nullable = true)
|-- NUMBER OF PERSONS KILLED: integer (nullable = true)
|-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
|-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
Our new dataframe, df_new, has 6 columns. Besides DATE and BOROUGH columns, all other columns have really long names. Let's rename those columns to something more precise.
In [6]:
df_new=df_new.withColumnRenamed('NUMBER OF PERSONS INJURED','PERSONS INJURED') \
.withColumnRenamed('NUMBER OF PERSONS KILLED','PERSONS KILLED') \
.withColumnRenamed('NUMBER OF PEDESTRIANS INJURED','PEDESTRIANS INJURED') \
.withColumnRenamed('NUMBER OF PEDESTRIANS KILLED','PEDESTRIANS KILLED')
# Add a new column which gives total number of PERSONS KILLED and PERSONS INJUREDdf_new.withColumn('PERSONS KILLED OR INJURED', \
(df_new['PERSONS KILLED']+df_new['PERSONS INJURED'])).select(['BOROUGH','PERSONS KILLED','PERSONS INJURED','PERSONS KILLED OR INJURED']).show()
Now that we have removed all rows with null values, I would like to know how many distinct values of BOROUGH we have in our dataset. NYC has 5 boroughs so if this dataset covers all boroughs, we should see 5 distinct values.
# Group data by multiple columns and in different orders (ascending and descending)df_new.orderBy(['PERSONS INJURED','PEDESTRIANS KILLED'],ascending=[0,1]).select(['DATE','BOROUGH','PERSONS INJURED','PEDESTRIANS KILLED']).show()
# We can also use SQL like syntax by first creating a view and then querying itdf_new.createOrReplaceTempView('sql_example')spark.sql('select * from sql_example where BOROUGH="QUEENS"').show()
# Here is another example where I am doing aggregation using the SQL syntaxspark.sql('select BOROUGH, count("PERSONS INJURED") from sql_example group by BOROUGH').show()