I wanted to learn ElasticSearch using the Scala library Elastic4s.
First let us look at the SBT imports
name := "Elastic4sLearn"
version := "1.0"
scalaVersion := "2.12.3"
val elastic4sVersion = "0.90.2.8"
libraryDependencies ++= Seq(
"com.sksamuel.elastic4s" %% "elastic4s-core" % "5.5.3",
"com.sksamuel.elastic4s" %% "elastic4s-tcp" % "5.5.3",
"com.sksamuel.elastic4s" %% "elastic4s-http" % "5.5.3",
"com.sksamuel.elastic4s" %% "elastic4s-streams" % "5.5.3",
"com.sksamuel.elastic4s" %% "elastic4s-circe" % "5.5.3",
"org.apache.logging.log4j" % "log4j-api" % "2.9.1",
"org.apache.logging.log4j" % "log4j-core" % "2.9.1",
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % "5.5.3" % "test"
)
Here the entry log4j entries are a real life safer. i would have missed many issues with my applicaiton had I not been prodent enough to import them. Next we need to configure the log4j logger so that we can troubleshoot our appliction easily. Create a file called log4j.xml in src/main/resources
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<!-- Author: Crunchify.com -->
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" />
</Console>
<RollingFile name="RollingFile" filename="log/output.log"
filepattern="${logPath}/%d{YYYYMMddHHmmss}-fargo.log">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" />
<Policies>
<SizeBasedTriggeringPolicy size="100 MB" />
</Policies>
<DefaultRolloverStrategy max="20" />
</RollingFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console" />
<AppenderRef ref="RollingFile" />
</Root>
</Loggers>
</Configuration>
Installing elastic search on my remote server was pretty easy
- brew install elasticsearch
- search for a file called elasticsearch.yml and add the following line to it network.host: myremote-server
- brew services start elasticsearch
Its import to set the bind address otherwise you will not be able to connect remotely to elastic search. I always install all server products remotely so that they don’t slow my MBP down.
In order to ensure that our product is up and running point your browser to http://myremote-server:9200. You should see the following output
{
"name" : "NYrp1l-",
"cluster_name" : "elasticsearch_abhisheksrivastava",
"cluster_uuid" : "rkNONwmrTq-6tsyvxN5Zig",
"version" : {
"number" : "5.6.2",
"build_hash" : "57e20f3",
"build_date" : "2017-09-23T13:16:45.703Z",
"build_snapshot" : false,
"lucene_version" : "6.6.1"
},
"tagline" : "You Know, for Search"
}
Make a note your the cluster name because this is needed. A lot of documentation on the internet assumes that your cluster name is elasticsearch and that is why programs fail to connect to elastic search.
Now we will write two applications. One using TcpClient and other using HttpClient to interact with the ElasticSearch server.
TcpClient.
In order to connect to elasticsearch using the TcpClient, we must be aware of our cluster name.
package com.abhi
import com.sksamuel.elastic4s.{ElasticsearchClientUri, TcpClient}
import org.elasticsearch.common.settings.Settings
object Scala4s extends App {
val settings = Settings.builder().put("cluster.name", "elasticsearch_abhisheksrivastava").build()
implicit val client = TcpClient.transport(settings, ElasticsearchClientUri("elasticsearch://abhisheks-mini:9300"))
client.close()
}
Note that I had to specify the name of the cluster in the properties. without this the application would not have worked.
Now we need to complete 3 tasks. We need to create an Index, Insert a document in that index and finally query the document. To complete each task you will need to familiarize yourself with the Elastic4s DSL. The pattern to use the DSL is pretty consistent. you first create the DSL command then execute the command using the client.
- Create Index DSL.
createIndex("bands").mappings(mapping("artist") as(textField("name")))
- Insert a document
indexInto("bands" / "artists") doc Artist("nirvana") refresh(RefreshPolicy.IMMEDIATE)
- Query the document
search("bands" / "artists") query "nirvana"
Now all we need to do is to tie in these 3 DSL commands into our application and execute them.
We will wrap these in Functions and then invoke those functions in a monadic way. Function to create the index
def createElasticIndex(implicit client: TcpClient) : Future[CreateIndexResponse] = {
import com.sksamuel.elastic4s.ElasticDsl._
client.execute {
createIndex("bands").mappings(
mapping("artist") as(
textField("name")
)
)
}
}
Function to Insert a document in the Index
def insertDocument(implicit client: TcpClient) : Future[RichIndexResponse] = {
import com.sksamuel.elastic4s.ElasticDsl._
client.execute {
indexInto("bands" / "artists") doc Artist("nirvana") refresh(RefreshPolicy.IMMEDIATE)
}
}
And the Function to query the document
def queryDocument(artist: String)(implicit client: TcpClient) : Future[RichSearchResponse] = {
import com.sksamuel.elastic4s.ElasticDsl._
client.execute {
search("bands" / "artists") query artist
}
}
Now we tie these together as
val futureArtist = for {
_ <- createElasticIndex
_ <- insertDocument
resp <- queryDocument("nirvana")
} yield resp.to[Artist]
futureArtist.onComplete(_ => client.close())
val artistList = Await.result(futureArtist, Duration.Inf)
artistList.foreach(println)
HttpClient.
Establishing a connection via the HttpClient is a little easier because we can connect just with the server name and port (without knowing the cluster name)
val client = HttpClient(ElasticsearchClientUri("abhisheks-mini", 9200))
client.close()
The process of using the DSL is the same. We just have to be careful that we import the Http DSL com.sksamuel.elastic4s.http.ElasticDsl._
I just imported the previous TcpClient DSL at first and my application had tons of compiler errors.
Once again we write our 3 functions to create the index, insert documents in the index and then query.
def createMyIndex(implicit client: HttpClient) : Future[CreateIndexResponse] = {
import com.sksamuel.elastic4s.http.ElasticDsl._
client.execute{
createIndex("myindex").mappings(mapping("mytype") as(textField("country"), textField("capital")))
}
}
def insertDocument(implicit client: HttpClient) : Future[BulkResponse] = {
import com.sksamuel.elastic4s.http.ElasticDsl._
client.execute {
bulk(
indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
indexInto("myindex" / "mytype").fields("country" -> "Nambia", "capital" -> "Windhoek")
).refresh(RefreshPolicy.IMMEDIATE)
}
}
def queryCapital(capital: String)(implicit client: HttpClient) : Future[SearchResponse] = {
import com.sksamuel.elastic4s.http.ElasticDsl._
client.execute {
search("myindex").matchQuery("capital", capital)
}
}
and finally we can connect all the 3 functions by means of a simple for statement
val futureResult = for {
_ <- createMyIndex
_ <- insertDocument
response <- queryCapital("Ulaanbaatar")
} yield response.to[Result]
futureResult.onComplete{_ => client.close()}
val results = Await.result(futureResult, Duration.Inf)
results.foreach(println)
The whole application can be found at my github