443-970-2353
[email protected]
CV Resume
Elasticsearch is an open source distributed full text search engine and it is the most popular enterprise search engine. Kibana, on the other hand, helps us to visualize and analyze data that resides on Elasticsearch. Elasticsearch is used by many notable companies such as Facebook, Github, Quora, etc. We can analyze, visualize and search both structured and unstructured data in real time by using the Elastic Stack (Elasticsearch, Logstash and Kibana). All of them are free. Logstash is used for data collection and log parsing. We can import data to Elasticsearch from various sources using Logstash.
In this blog post, I show how to transfer data from MySQL to Elasticsearch and visualize it with Kibana. The Logstash configuration and R code are shown below. They are also availble on Github.
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/storms"
jdbc_user => "root"
jdbc_password => "mypassword"
statement => "SELECT * from storms"
}
}
filter {
mutate { convert => {"lat" => "float"} }
mutate { convert => {"long" => "float"} }
mutate { rename => {"lat" => "[location][lat]"} }
mutate { rename => {"long" => "[location][lon]"} }
date {
locale => "eng"
match => ["datetime", "yyyy-MM-dd HH:mm:ss", "ISO8601"]
target => "datetime"
}
}
output{
elasticsearch {
hosts => ["localhost:9200"]
index => "storms"
user => "elastic"
password => "changeme"
}
stdout { codec => rubydebug { metadata => true } }
# stdout { codec => dots }
}
PUT /_template/storms
{
"order": 0,
"template": "storms*",
"mappings": {
"_default_": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}
}
library(dplyr)
library(RMySQL)
my_storms = storms
my_storms$DateTime = paste0(storms$year, "-",storms$month, "-",storms$day, " ", storms$hour,":00:00")
my_storms = my_storms %>% select(name, DateTime, lat, long, status, category, wind, pressure)
mydb = dbConnect(MySQL(),
dbname = "storms",
host = "127.0.0.1",
port = 3306,
user = "root",
password = "mypassword")
dbWriteTable(mydb,'storms',
my_storms,
row.names = FALSE,
overwrite = TRUE)