Parallel data management for complex queries over many records, using a familiar N1QL-like syntax.
For complex and long-running queries, involving large ad hoc join, set, aggregation, and grouping operations, Couchbase Data Platform offers the Couchbase Analytics Service (CBAS). This is the analytic counterpart to our operational data focussed Query Service. The analytics service is available in Couchbase Data Platform 6.0 and later (developer preview in 5.5).
Getting Started
After familiarizing yourself with our introductory primer, in particular creating a dataset and linking it to a bucket to shadow the operational data, try Couchbase Analytics using the Scala SDK. Intentionally, the API for analytics is very similar to that of the query service.
Before starting, here’s all imports used in the following examples:
import com.couchbase.client.scala._
import com.couchbase.client.scala.analytics.{AnalyticsOptions, AnalyticsParameters, AnalyticsResult, ReactiveAnalyticsResult}
import com.couchbase.client.scala.json._
import reactor.core.scala.publisher.{SFlux, SMono}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
Here’s a complete example of doing an analytics query and handling the results:
val query = """select "hello" as greeting;"""
val result: Try[AnalyticsResult] = cluster.analyticsQuery(query)
val rows: Try[Seq[JsonObject]] = result.flatMap(_.rowsAs[JsonObject])
rows match {
case Success(r: Seq[JsonObject]) => println(s"Row: ${r.head}")
case Failure(err) => println(s"Failure ${err}")
}
Let’s break this down. First, we get the results in the form of a Try[AnalyticsResult]
.
The Scala SDK returns Try
rather than throwing exceptions, to allow you to handle errors in a functional way.
A Try
can either be a Success(AnalyticsResult)
if the query was successfully executed, or Failure(Throwable)
if something went wrong.
An AnalyticsResult
contains various things of interest, such as metrics, but the main thing we’re interested in are the rows (results). They’re fetched with the allRowsAs
call.
Note that the flatMap
means the allRowsAs
call will only be attempted if the initial query was successful.
Otherwise rows
will contain the Failure(Throwable)
from the query result.
Here we’re fetching rows converted into JsonObject
, but as with N1QL there’s many more options available.
Rows can be returned as JSON representations from multiple third party Scala libraries, such as Circe, directly as case classes, and more.
Please see JSON Libraries for full details.
Finally, we pattern match on the rows
to find whether the operations were successful.
We check explicitly for AnalyticsError
which indicates an error from the analytics service.
There can be other errors returned, please see Error Handling for details.
We can write that example more concisely, like this:
cluster.analyticsQuery("""select "hello" as greeting;""")
.flatMap(_.rowsAs[JsonObject]) match {
case Success(r) => println(s"Row: ${r.head}")
case Failure(err) => println(s"Failure ${err}")
}
Queries
A query can either be simple
or be parameterized
. If parameters are used, they can either be positional
or named
.
Here is one example of each:
cluster.analyticsQuery(
"""select airportname, country from airports where country = ?;""",
AnalyticsOptions().parameters(AnalyticsParameters.Positional("France")))
.flatMap(_.rowsAs[JsonObject]) match {
case Success(r) => r.foreach(row => println(s"Row: ${row}"))
case Failure(err) => println(s"Failure ${err}")
}
cluster.analyticsQuery(
"""select airportname, country from airports where country = $country;""",
AnalyticsOptions().parameters(AnalyticsParameters.Named(Map("country" -> "France"))))
.flatMap(_.rowsAs[JsonObject]) match {
case Success(r) => r.foreach(row => println(s"Row: ${row}"))
case Failure(err) => println(s"Failure ${err}")
}
Additional Parameters
The handful of additional parameters are illustrated here:
cluster.analyticsQuery(
"""select airportname, country from airports where country = "France";""",
AnalyticsOptions()
// Ask the analytics service to give this request higher priority
.priority(true)
// The client context id is returned in the results, so can be used by the
// application to correlate requests and responses
.clientContextId("my-id")
// Override how long the analytics query is allowed to take before timing out
.timeout(90.seconds)
) match {
case Success(r: AnalyticsResult) =>
assert(r.metaData.clientContextId.contains("my-id"))
case Failure(err) => println(s"Failure ${err}")
}
}
Metadata
AnalyticsResult
contains a meta.metrics
field that contains useful metadata, such as elapsedTime
, and resultCount
:
val stmt =
"""select airportname, country from airports where country = "France";"""
cluster.analyticsQuery(stmt) match {
case Success(result) =>
val metrics = result.metaData.metrics
println(s"Elapsed: ${metrics.elapsedTime}")
println(s"Results: ${metrics.resultCount}")
println(s"Errors: ${metrics.errorCount}")
case Failure(err) => println(s"Failure ${err}")
}
For a full listing of available Metrics
in Metadata
, see the Understanding Analytics documentation.
Streaming Large Result Sets
The Scala SDK provides three SDKs (documented further on Choosing an API):
-
The blocking API you’ve seen so far, that returns an
AnalyticsResult
containing all rows. -
An async API that returns a
Future[AnalyticsResult]
, which also contains all rows. This can be accessed like this:
// When we work with Scala Futures an ExecutionContext must be provided.
// For this example we'll just use the global default
import scala.concurrent.ExecutionContext.Implicits.global
val stmt = """select airportname, country from airports where country = "France";"""
val future: Future[AnalyticsResult] = cluster.async.analyticsQuery(stmt)
future onComplete {
case Success(result) =>
result.rowsAs[JsonObject] match {
case Success(rows) => rows.foreach(println(_))
case Failure(err) => println(s"Error: $err")
}
case Failure(err) => println(s"Error: $err")
}
-
A reactive API, that can be used to stream rows.
The former two APIs buffer all rows in-memory until they can be returned to you. With smaller queries this is likely to be fine, but for large data sets this could lead to Java OutOfMemoryError
exceptions.
The recommended solution is to use the reactive API. Reactive programming is a sophisticated paradigm that is rapidly gaining popularity for its ability to handle, amongst other things, streaming large amounts of data over fallible networks, while allowing error handling and backpressure.
The Scala SDK exposes primitives from the Project Reactor library, most notably Mono
and Flux
. We strongly recommend learning a little of this library first, and the following examples will assume basic familiarity with Reactor.
You’ll see both reactor.core.scala.publisher and reactor.core.publisher imports available for Reactor. Use the former, it is the Scala-optimized variant that the Scala SDK will return.
|
Here’s how to perform a query and stream the results using the reactive API:
val stmt =
"""select airportname, country from airports where country = "France";"""
val mono: SMono[ReactiveAnalyticsResult] = cluster.reactive.analyticsQuery(stmt)
val rows: SFlux[JsonObject] = mono
// ReactiveQueryResult contains a rows: Flux[AnalyticsRow]
.flatMapMany(result => result.rowsAs[JsonObject])
// Just for example, block on the rows. This is not best practice and apps
// should generally not block.
val allRows: Seq[JsonObject] = rows
.doOnNext(row => println(row))
.doOnError(err => println(s"Error: $err"))
.collectSeq()
.block()