In a previous post, we saw how to install R, Rstudio server and R packages on AWS EC2 Red Hat cluster to use with Hortonworks Data Platform (HDP 2.4) Hadoop distribution. Now, let’s use SparkR for data munging.


Starting Up SparkR from RStudio

To connect to a Spark cluster from within Rstudio, we have to set the SPARK_HOME in environment. We can also specify additional libraries that we want to use. I am using Spark-CSV, a library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames.

Sys.setenv(SPARK_HOME="/usr/hdp/current/spark-client/",
           'SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.4.0" "sparkr-shell"')

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))

When loading and attaching a new package in R, it is possible to have a name conflict, where a function is masking another function. Since part of SparkR is modeled on the dplyr package, certain functions in SparkR share the same names with those in dplyr. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, SparkR::cume_dist(x) or dplyr::cume_dist(x). Let’s load SparkR after the other packages so that its functions don’t get masked.


Load Packages

library(dplyr)

Attaching package: 'dplyr'
The following objects are masked from 'package:stats':

    filter, lag
The following objects are masked from 'package:base':

    intersect, setdiff, setequal, union
library(ggplot2)
library(magrittr)
library(SparkR)

Attaching package: 'SparkR'
The following objects are masked from 'package:dplyr':

    arrange, between, collect, contains, count, cume_dist,
    dense_rank, desc, distinct, explain, filter, first, group_by,
    intersect, lag, last, lead, mutate, n, n_distinct, ntile,
    percent_rank, rename, row_number, sample_frac, select, sql,
    summarize
The following objects are masked from 'package:stats':

    cov, filter, lag, na.omit, predict, sd, var
The following objects are masked from 'package:base':

    colnames, colnames<-, endsWith, intersect, rank, rbind,
    sample, startsWith, subset, summary, table, transform

Starting Up SparkContext and SQLContext

The entry point into SparkR is the SparkContext which connects our R program to a Spark cluster. We can create a SparkContext using sparkR.init. Further, to work with DataFrames we will need a SQLContext, which can be created from the SparkContext.

sc <- SparkR::sparkR.init()
Launching java with spark-submit command /usr/hdp/current/spark-client//bin/spark-submit   "--packages" "com.databricks:spark-csv_2.11:1.4.0" "sparkr-shell" /tmp/RtmppEk4US/backend_portcec6f3fc42 
sqlContext <-sparkRSQL.init(sc)

Spark DataFrame operations such as filtering, grouping, aggregating, summary statistics are supported. Operations take advantage of multiple cores/machines and thus can scale to larger data than standalone R.

I am using the flights data. I downloaded the data from here and injested it Hadoop Distributed File System (HDFS). You can read my blog post on data injestion to HDFS here.


Load the flights data

Let’s load the flights CSV file using ‘read.df’. Note, we are using the spark-csv library.

flights <- read.df(sqlContext, "hdfs:///tmp/nycflights13.csv", "com.databricks.spark.csv", header="true")

Quick Exploration

Let’s see the class, dimensions, and the first few recods of flights. We see that the functions below are available in both R and SparkR.

class(flights)          # We can see that it of class SparkR
[1] "DataFrame"
attr(,"package")
[1] "SparkR"
printSchema(flights)    # Prints out the schema in tree format 
root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
str(flights)
'DataFrame': 16 variables:
 $ year     : chr "2013" "2013" "2013" "2013" "2013" "2013"
 $ month    : chr "1" "1" "1" "1" "1" "1"
 $ day      : chr "1" "1" "1" "1" "1" "1"
 $ dep_time : chr "517" "533" "542" "544" "554" "554"
 $ dep_delay: chr "2" "4" "2" "-1" "-6" "-4"
 $ arr_time : chr "830" "850" "923" "1004" "812" "740"
 $ arr_delay: chr "11" "20" "33" "-18" "-25" "12"
 $ carrier  : chr "UA" "UA" "AA" "B6" "DL" "UA"
 $ tailnum  : chr "N14228" "N24211" "N619AA" "N804JB" "N668DN" "N39463"
 $ flight   : chr "1545" "1714" "1141" "725" "461" "1696"
 $ origin   : chr "EWR" "LGA" "JFK" "JFK" "LGA" "EWR"
 $ dest     : chr "IAH" "IAH" "MIA" "BQN" "ATL" "ORD"
 $ air_time : chr "227" "227" "160" "183" "116" "150"
 $ distance : chr "1400" "1416" "1089" "1576" "762" "719"
 $ hour     : chr "5" "5" "5" "5" "5" "5"
 $ minute   : chr "17" "33" "42" "44" "54" "54"
head(flights,num=10)    # Return the first NUM rows of a DataFrame as a data.frame. 
   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1  2013     1   1      517         2      830        11      UA  N14228
2  2013     1   1      533         4      850        20      UA  N24211
3  2013     1   1      542         2      923        33      AA  N619AA
4  2013     1   1      544        -1     1004       -18      B6  N804JB
5  2013     1   1      554        -6      812       -25      DL  N668DN
6  2013     1   1      554        -4      740        12      UA  N39463
7  2013     1   1      555        -5      913        19      B6  N516JB
8  2013     1   1      557        -3      709       -14      EV  N829AS
9  2013     1   1      557        -3      838        -8      B6  N593JB
10 2013     1   1      558        -2      753         8      AA  N3ALAA
   flight origin dest air_time distance hour minute
1    1545    EWR  IAH      227     1400    5     17
2    1714    LGA  IAH      227     1416    5     33
3    1141    JFK  MIA      160     1089    5     42
4     725    JFK  BQN      183     1576    5     44
5     461    LGA  ATL      116      762    5     54
6    1696    EWR  ORD      150      719    5     54
7     507    EWR  FLL      158     1065    5     55
8    5708    LGA  IAD       53      229    5     57
9      79    JFK  MCO      140      944    5     57
10    301    LGA  ORD      138      733    5     58
                        # If NUM is NULL, then head() returns the first 6 rows
dim(flights)            # Returns the dimentions (number of rows and columns) of a DataFrame
[1] 336776     16

We can also use showDF to print the first numRows rows of a DataFrame. Defaults to 20.

showDF(flights,numRows = 25, truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|1    |1  |517     |2        |830     |11       |UA     |N14228 |1545  |EWR   |IAH |227     |1400    |5   |17    |
|2013|1    |1  |533     |4        |850     |20       |UA     |N24211 |1714  |LGA   |IAH |227     |1416    |5   |33    |
|2013|1    |1  |542     |2        |923     |33       |AA     |N619AA |1141  |JFK   |MIA |160     |1089    |5   |42    |
|2013|1    |1  |544     |-1       |1004    |-18      |B6     |N804JB |725   |JFK   |BQN |183     |1576    |5   |44    |
|2013|1    |1  |554     |-6       |812     |-25      |DL     |N668DN |461   |LGA   |ATL |116     |762     |5   |54    |
|2013|1    |1  |554     |-4       |740     |12       |UA     |N39463 |1696  |EWR   |ORD |150     |719     |5   |54    |
|2013|1    |1  |555     |-5       |913     |19       |B6     |N516JB |507   |EWR   |FLL |158     |1065    |5   |55    |
|2013|1    |1  |557     |-3       |709     |-14      |EV     |N829AS |5708  |LGA   |IAD |53      |229     |5   |57    |
|2013|1    |1  |557     |-3       |838     |-8       |B6     |N593JB |79    |JFK   |MCO |140     |944     |5   |57    |
|2013|1    |1  |558     |-2       |753     |8        |AA     |N3ALAA |301   |LGA   |ORD |138     |733     |5   |58    |
|2013|1    |1  |558     |-2       |849     |-2       |B6     |N793JB |49    |JFK   |PBI |149     |1028    |5   |58    |
|2013|1    |1  |558     |-2       |853     |-3       |B6     |N657JB |71    |JFK   |TPA |158     |1005    |5   |58    |
|2013|1    |1  |558     |-2       |924     |7        |UA     |N29129 |194   |JFK   |LAX |345     |2475    |5   |58    |
|2013|1    |1  |558     |-2       |923     |-14      |UA     |N53441 |1124  |EWR   |SFO |361     |2565    |5   |58    |
|2013|1    |1  |559     |-1       |941     |31       |AA     |N3DUAA |707   |LGA   |DFW |257     |1389    |5   |59    |
|2013|1    |1  |559     |0        |702     |-4       |B6     |N708JB |1806  |JFK   |BOS |44      |187     |5   |59    |
|2013|1    |1  |559     |-1       |854     |-8       |UA     |N76515 |1187  |EWR   |LAS |337     |2227    |5   |59    |
|2013|1    |1  |600     |0        |851     |-7       |B6     |N595JB |371   |LGA   |FLL |152     |1076    |6   |0     |
|2013|1    |1  |600     |0        |837     |12       |MQ     |N542MQ |4650  |LGA   |ATL |134     |762     |6   |0     |
|2013|1    |1  |601     |1        |844     |-6       |B6     |N644JB |343   |EWR   |PBI |147     |1023    |6   |1     |
|2013|1    |1  |602     |-8       |812     |-8       |DL     |N971DL |1919  |LGA   |MSP |170     |1020    |6   |2     |
|2013|1    |1  |602     |-3       |821     |16       |MQ     |N730MQ |4401  |LGA   |DTW |105     |502     |6   |2     |
|2013|1    |1  |606     |-4       |858     |-12      |AA     |N633AA |1895  |EWR   |MIA |152     |1085    |6   |6     |
|2013|1    |1  |606     |-4       |837     |-8       |DL     |N3739P |1743  |JFK   |ATL |128     |760     |6   |6     |
|2013|1    |1  |607     |0        |858     |-17      |UA     |N53442 |1077  |EWR   |MIA |157     |1085    |6   |7     |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 25 rows

Filtering Rows

The SparkR function filter filters the rows of a DataFrame according to a given condition. Note: We are using piping (%>%) from the magrittr package. We can use collect or as.data.frame to get an R’s data.frame from Spark DataFrame.

filter(flights, flights$month == 1| flights$day == 1)%>%head(num=7)   # select flights on January 1st and return 7
  year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013     1   1      517         2      830        11      UA  N14228
2 2013     1   1      533         4      850        20      UA  N24211
3 2013     1   1      542         2      923        33      AA  N619AA
4 2013     1   1      544        -1     1004       -18      B6  N804JB
5 2013     1   1      554        -6      812       -25      DL  N668DN
6 2013     1   1      554        -4      740        12      UA  N39463
7 2013     1   1      555        -5      913        19      B6  N516JB
  flight origin dest air_time distance hour minute
1   1545    EWR  IAH      227     1400    5     17
2   1714    LGA  IAH      227     1416    5     33
3   1141    JFK  MIA      160     1089    5     42
4    725    JFK  BQN      183     1576    5     44
5    461    LGA  ATL      116      762    5     54
6   1696    EWR  ORD      150      719    5     54
7    507    EWR  FLL      158     1065    5     55
filter(flights, flights$month == 1 | flights$month == 2)%>%head()# select flights on January or February
  year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1 2013     1   1      517         2      830        11      UA  N14228
2 2013     1   1      533         4      850        20      UA  N24211
3 2013     1   1      542         2      923        33      AA  N619AA
4 2013     1   1      544        -1     1004       -18      B6  N804JB
5 2013     1   1      554        -6      812       -25      DL  N668DN
6 2013     1   1      554        -4      740        12      UA  N39463
  flight origin dest air_time distance hour minute
1   1545    EWR  IAH      227     1400    5     17
2   1714    LGA  IAH      227     1416    5     33
3   1141    JFK  MIA      160     1089    5     42
4    725    JFK  BQN      183     1576    5     44
5    461    LGA  ATL      116      762    5     54
6   1696    EWR  ORD      150      719    5     54

Ordering rows

We can use arrange to sort a DataFrame by the specified column(s).

arrange(flights, flights$year, flights$month, flights$day)%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|1    |1  |517     |2        |830     |11       |UA     |N14228 |1545  |EWR   |IAH |227     |1400    |5   |17    |
|2013|1    |1  |533     |4        |850     |20       |UA     |N24211 |1714  |LGA   |IAH |227     |1416    |5   |33    |
|2013|1    |1  |542     |2        |923     |33       |AA     |N619AA |1141  |JFK   |MIA |160     |1089    |5   |42    |
|2013|1    |1  |544     |-1       |1004    |-18      |B6     |N804JB |725   |JFK   |BQN |183     |1576    |5   |44    |
|2013|1    |1  |554     |-6       |812     |-25      |DL     |N668DN |461   |LGA   |ATL |116     |762     |5   |54    |
|2013|1    |1  |554     |-4       |740     |12       |UA     |N39463 |1696  |EWR   |ORD |150     |719     |5   |54    |
|2013|1    |1  |555     |-5       |913     |19       |B6     |N516JB |507   |EWR   |FLL |158     |1065    |5   |55    |
|2013|1    |1  |557     |-3       |709     |-14      |EV     |N829AS |5708  |LGA   |IAD |53      |229     |5   |57    |
|2013|1    |1  |557     |-3       |838     |-8       |B6     |N593JB |79    |JFK   |MCO |140     |944     |5   |57    |
|2013|1    |1  |558     |-2       |753     |8        |AA     |N3ALAA |301   |LGA   |ORD |138     |733     |5   |58    |
|2013|1    |1  |558     |-2       |849     |-2       |B6     |N793JB |49    |JFK   |PBI |149     |1028    |5   |58    |
|2013|1    |1  |558     |-2       |853     |-3       |B6     |N657JB |71    |JFK   |TPA |158     |1005    |5   |58    |
|2013|1    |1  |558     |-2       |924     |7        |UA     |N29129 |194   |JFK   |LAX |345     |2475    |5   |58    |
|2013|1    |1  |558     |-2       |923     |-14      |UA     |N53441 |1124  |EWR   |SFO |361     |2565    |5   |58    |
|2013|1    |1  |559     |-1       |941     |31       |AA     |N3DUAA |707   |LGA   |DFW |257     |1389    |5   |59    |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 15 rows
# Do we want ascending or descending?
arrange(flights, flights$year, desc(flights$month), asc(flights$day))%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2013|9    |1  |9       |10       |343     |3        |B6     |N663JB |839   |JFK   |BQN |196     |1576    |0   |9     |
|2013|9    |1  |600     |-10      |824     |-31      |AA     |N3JTAA |705   |LGA   |DFW |185     |1389    |6   |0     |
|2013|9    |1  |117     |152      |218     |139      |B6     |N216JB |1816  |JFK   |SYR |44      |209     |1   |17    |
|2013|9    |1  |508     |-8       |717     |-43      |UA     |N57869 |1545  |EWR   |IAH |170     |1400    |5   |8     |
|2013|9    |1  |537     |-8       |849     |-6       |AA     |N613AA |701   |JFK   |MIA |148     |1089    |5   |37    |
|2013|9    |1  |537     |-8       |906     |-15      |B6     |N658JB |939   |JFK   |BQN |191     |1576    |5   |37    |
|2013|9    |1  |549     |-11      |815     |-35      |UA     |N27213 |1115  |EWR   |TPA |133     |997     |5   |49    |
|2013|9    |1  |552     |-8       |843     |-22      |AA     |N3KFAA |1895  |LGA   |MIA |149     |1096    |5   |52    |
|2013|9    |1  |553     |-7       |809     |-25      |B6     |N784JB |27    |EWR   |MCO |117     |937     |5   |53    |
|2013|9    |1  |554     |-6       |700     |-16      |EV     |N14180 |6167  |LGA   |IAD |39      |229     |5   |54    |
|2013|9    |1  |554     |-6       |803     |-20      |DL     |N909DE |461   |LGA   |ATL |105     |762     |5   |54    |
|2013|9    |1  |554     |-6       |657     |-18      |MQ     |N516MQ |3267  |EWR   |ORD |104     |719     |5   |54    |
|2013|9    |1  |555     |-5       |835     |-16      |B6     |N639JB |371   |LGA   |FLL |139     |1076    |5   |55    |
|2013|9    |1  |557     |-3       |706     |-10      |EV     |N909EV |5716  |JFK   |IAD |43      |228     |5   |57    |
|2013|9    |1  |557     |-3       |718     |-22      |WN     |N553WN |726   |LGA   |STL |125     |888     |5   |57    |
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 15 rows

Selecting columns

select selects a set of columns with names or Column expressions.

select(flights, flights$year, flights$month, flights$day)%>%showDF(numRows =15,truncate = FALSE)
+----+-----+---+
|year|month|day|
+----+-----+---+
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
|2013|1    |1  |
+----+-----+---+
only showing top 15 rows
# select(flights, c("year", "month", "day")   this also gives the same result

Getting distinct records

distinct returns a new DataFrame containing the distinct rows in this DataFrame.

distinct(select(flights, flights$tailnum))%>%dim()  # See how many distinct 'tailnum' we have
[1] 4044    1

Adding new columns

withColumn returns a new DataFrame with the specified column added.

 withColumn(flights,'gain', flights$arr_delay - flights$dep_delay)%>%
  select(c('year','month','day','gain'))%>%
  showDF(numRows =8,truncate = FALSE)
+----+-----+---+-----+
|year|month|day|gain |
+----+-----+---+-----+
|2013|1    |1  |9.0  |
|2013|1    |1  |16.0 |
|2013|1    |1  |31.0 |
|2013|1    |1  |-17.0|
|2013|1    |1  |-19.0|
|2013|1    |1  |16.0 |
|2013|1    |1  |24.0 |
|2013|1    |1  |-11.0|
+----+-----+---+-----+
only showing top 8 rows

Sampling

Sample returns a sampled subset of this DataFrame using a random seed.

collect(sample(flights, FALSE, 0.00005))  # Sample without replacement
   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1  2013     1  14      642        -3      858        12      US  N565UW
2  2013     1  25     1204        -1     1313       -12      WN  N291WN
3  2013    10  15     1758        -2     2017       -13      DL  N337NB
4  2013    12  14     2035       125      132       223      DL  N3758Y
5  2013    12  28     1733         3     2055         0      AA  N344AA
6  2013     3   5     1932         3     2129        -1      EV  N12167
7  2013     4  15      734        14     1004        29      WN  N340LV
8  2013     4  17      855        -5     1017        -8      US  N750UW
9  2013     4  23      754        -1      924        14      MQ  N804MQ
10 2013     4  27     1603        -2     1908       -16      DL  N365NB
11 2013     5   2     1558        -1     1804       -10      MQ  N659MQ
12 2013     5  16     1650        -9     1835       -31      9E  N8847A
13 2013     6   7     1117        64     1320        65      EV  N16183
14 2013     6  25     1257         6     1448        -2      DL  N369NB
15 2013     7  26     1147       -13     1250       -28      EV  N27962
16 2013     9   3     2007        -6     2222       -32      EV  N17159
17 2013     9  11       NA        NA       NA        NA      UA        
18 2013     9  14      943        -2     1048       -27      WN  N769SW
19 2013     9  19     1237        -3     1525       -20      WN  N715SW
   flight origin dest air_time distance hour minute
1     926    EWR  CLT       89      529    6     42
2     644    EWR  MDW      114      711   12      4
3     926    EWR  ATL      109      746   17     58
4     438    JFK  SAN      320     2446   20     35
5    2351    JFK  MIA      165     1089   17     33
6    4663    EWR  CHS       97      628   19     32
7      35    EWR  DEN      252     1605    7     34
8    2167    LGA  DCA       41      214    8     55
9    4418    JFK  DCA       61      213    7     54
10   1433    LGA  RSW      158     1080   16      3
11   3985    JFK  CVG       85      589   15     58
12   3881    EWR  CVG       85      569   16     50
13   4255    EWR  CHS      101      628   11     17
14   1131    LGA  DTW       78      502   12     57
15   3838    EWR  ROC       42      246   11     47
16   4141    EWR  OKC      172     1325   20      7
17    708    EWR  ORD       NA      719   NA     NA
18   1123    LGA  MDW      102      725    9     43
19     42    LGA  HOU      195     1428   12     37
collect(sample(flights, TRUE, 0.00005))  # Sample with replacement
   year month day dep_time dep_delay arr_time arr_delay carrier tailnum
1  2013    10  30      730       -15     1031       -12      B6  N630JB
2  2013    10  30     1506        -9     1808        -2      AA  N587AA
3  2013    11   1      640         0      838         0      EV  N14203
4  2013    12  19     1828        -2     2030       -17      EV  N751EV
5  2013    12  23     1023        24     1252        24      9E  N8501F
6  2013     2   5      852        -7     1136        -9      DL  N642DL
7  2013     2   7     1326        -4     1611         6      DL  N37700
8  2013     2  24      723        -9     1022       -16      UA  N429UA
9  2013     4   2      631         1      747        -2      EV  N16918
10 2013     4  14      849        -6     1035       -34      DL  N766NC
11 2013     4  19     1811        31     1950        15      WN  N785SW
12 2013     4  26     1830       118     2020       129      EV  N13161
13 2013     6  14     2228        63       44        45      UA  N440UA
14 2013     6  30      835         0      951       -14      EV  N13913
15 2013     9   1     1827        39     2058        23      B6  N638JB
16 2013     9   3     1451        -5     1645         8      9E  N8800G
   flight origin dest air_time distance hour minute
1    1717    LGA  TPA      155     1010    7     30
2     320    EWR  DFW      204     1372   15      6
3    4166    EWR  GSP       96      594    6     40
4    5268    LGA  CLT       87      544   18     28
5    3518    LGA  TYS      120      647   10     23
6    1747    LGA  ATL      114      762    8     52
7    2043    JFK  ATL      135      760   13     26
8     395    LGA  IAH      214     1416    7     23
9    4533    EWR  BUF       54      282    6     31
10   2239    EWR  DTW       87      488    8     49
11   1596    EWR  STL      135      872   18     11
12   4702    EWR  GSO       69      445   18     30
13    407    EWR  IAH      179     1400   22     28
14   5824    EWR  RIC       52      277    8     35
15    299    LGA  MCO      128      950   18     27
16   3868    JFK  ORF       51      290   14     51

Counting number of records

Count the number of records for each group:

count(groupBy(flights, "carrier"))%>%collect()%>% dplyr::mutate(carrier=factor(carrier,levels = carrier[order(count,decreasing =T)]))%>%
  ggplot(aes(x=carrier, y=count))+geom_bar(stat="identity",fill='sky blue')


Aggregating

Group the flights by destination and aggregate

agg(group_by(flights, flights$carrier), 
                    count = n(flights$carrier), maximum =max(flights$distance),
                    minimum =min(flights$distance),mean =mean(flights$distance),
                    sample_standard_deviation=stddev_samp(flights$distance)
    )%>%showDF()
+-------+-----+-------+-------+------------------+-------------------------+
|carrier|count|maximum|minimum|              mean|sample_standard_deviation|
+-------+-----+-------+-------+------------------+-------------------------+
|     AA|32729|    944|   1005|1340.2359986556264|         637.736152529242|
|     HA|  342|   4983|   4983|            4983.0|                      0.0|
|     AS|  714|   2402|   2402|            2402.0|                      0.0|
|     B6|54635|    997|   1005| 1068.621524663677|         703.705659773323|
|     UA|58665|    997|   1008|1529.1148725816074|        798.8046815611141|
|     US|20536|     96|     17| 553.4562719127387|        583.8225238535786|
|     OO|   32|    733|   1008|          500.8125|        206.1722482967626|
|     VX| 5162|   2586|   2248|2499.4821774506004|         88.0488899750204|
|     WN|12275|    888|   1167|  996.269083503055|       410.42960818312923|
|     DL|48110|    963|   1005|1236.9012055705675|        660.1724143675561|
|     EV|54173|    997|   1008|    562.9917301977|       287.48815905858027|
|     F9|  685|   1620|   1620|            1620.0|     1.137426313741963...|
|     9E|18460|    964|   1005|  530.235752979415|       321.79870716218636|
|     YV|  601|     96|    229| 375.0332778702163|       159.71957370363523|
|     FL| 3260|    762|    397| 664.8294478527607|        160.8880361106722|
|     MQ|26397|    888|   1005| 569.5327120506118|        226.2287070772906|
+-------+-----+-------+-------+------------------+-------------------------+

Writing data to file

Save as CSV:

sampled =sample(flights, FALSE, 0.01)
write.df(sampled, "I_sampled_it", "com.databricks.spark.csv", mode = "overwrite")

Save as parquet

write.df(sampled, "I_sampled_it", "parquet", "overwrite")

We can also save the contents of a DataFrame as a Parquet file, preserving the schema, using write.parquet. Files written out with this method can be read back in as a DataFrame using read.parquet().


What if we want to save the contents of a DataFrame as a JSON file.

write.json(sampled, "I_sampled_it")

Note: files written out with this method can be read back in as a DataFrame using read.json().


Finally, terminate SparkR

sparkR.stop()
comments powered by Disqus