Parallel data management for complex queries over many records, using a familiar SQL++ syntax.
This page covers Couchbase’s traditional Analytics Service, for long-running queries. A separate, as-a-service offering — Capella columnar — offers real-time analytics.
Columnar SDKs
SDKs for Capella Columnar — Couchbase’s analytical database (RT-OLAP) for real time apps and operational intelligence — are available for the Java, Node.js, and Python platforms. See the Capella Columnar SDK pages for more information. |
For complex and long-running queries, involving large ad hoc join, set, aggregation, and grouping operations, Couchbase Data Platform offers the Couchbase Analytics Service (CBAS). This is the analytic counterpart to our operational data focussed Query Service.
The analytics service is available in Capella operational or the Enterprise Edition of self-managed Couchbase Server.
Getting Started
After familiarizing yourself with our introductory primer, in particular creating a dataset and linking it to a bucket, try Couchbase Analytics using the Python SDK. Intentionally, the API for analytics is nearly identical to that of the query service.
Here’s a complete example of doing a analytics and handling the results:
from couchbase.cluster import Cluster, ClusterOptions, AnalyticsOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
from couchbase.analytics import AnalyticsScanConsistency
cluster = Cluster.connect(
"couchbase://your-ip",
ClusterOptions(PasswordAuthenticator("Administrator", "password")))
bucket = cluster.bucket("travel-sample")
collection = bucket.default_collection()
try:
result = cluster.analytics_query("SELECT 'hello' AS greeting")
for row in result.rows():
print("Found row: {}".format(row))
print("Reported execution time: {}".format(
result.metadata().metrics().execution_time()))
except CouchbaseException as ex:
import traceback
traceback.print_exc()
When using a Couchbase version < 6.5 you must create a valid Bucket connection using cluster.bucket(name) before you can use Analytics.
|
Let’s break it down. An analytics query is always performed at the Cluster
level, using the analytics_query
method. It takes the statement as a required argument and then allows to provide additional options if needed.
Once a result returns you can iterate the returned rows and/or access the AnalyticsMetaData
associated with the query.
Parameterized Queries
Supplying parameters as individual arguments to the query allows the analytics engine to optimize the parsing and planning of the query. You can either supply these parameters by name or by position.
The first example shows how to provide them by name:
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = ?",
"France")
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = ?",
AnalyticsOptions(positional_parameters=["France"]))
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = $country",
country="France")
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = $country",
AnalyticsOptions(named_parameters={"country": "France"}))
The complete code for this page’s example can be found at analytics_ops.py.
What style you choose is up to you, for readability in more complex queries we generally recommend using the named parameters.
Note that you cannot use parameters in all positions.
If you put it in an unsupported place the server will respond with a CompilationFailedException
or similar.
The Analytics Result
When performing an analytics query, the response you receive is an AnalyticsResult
. If no exception gets raised the request succeeded and provides access to both the rows returned and also associated AnalyticsMetaData
.
result = cluster.analytics_query(
"SELECT a.* FROM airports a LIMIT 10")
# iterate over rows
for row in result:
# each row is an instance of the query call
print("Found row: {}".format(row))
The AnalyticsMetaData
provides insight into some basic profiling/timing information as well as information like the clientContextId
.
Name | Description |
---|---|
|
Returns the request identifer of this request. |
|
Returns the context ID either generated by the SDK or supplied by the user. |
|
An enum simply representing the state of the result. |
|
Returns metrics provided by analytics for the request. |
|
If a signature is present, it will be available to consume in a generic fashion. |
|
Non-fatal errors are available to consume as warnings on this method. |
For example, here is how you can print the executionTime
of a query:
result = cluster.analytics_query("SELECT 1=1")
print("Execution time: {}".format(
result.metadata().metrics().execution_time()))
Analytics Options
The analytics service provides an array of options to customize your query. The following table lists them all:
Name | Description |
---|---|
|
Sets a context ID returned by the service for debugging purposes. |
|
Allows to set positional arguments for a parameterized query. |
|
Allows to set named arguments for a parameterized query. |
|
Assigns a different server-side priority to the query. |
|
Escape hatch to add arguments that are not covered by these options. |
|
Tells the client and server that this query is readonly. |
|
Sets a different scan consistency for this query. |
Scan Consistency
By default, the analytics engine will return whatever is currently in the index at the time of query (this mode is also called AnalyticsScanConsistency.NOT_BOUNDED
). If you need to include everything that has just been written, a different scan consistency must be chosen. If AnalyticsScanConsistency.REQUEST_PLUS
is chosen, it will likely take a bit longer to return the results but the analytics engine will make sure that it is as up-to-date as possible.
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = 'France'",
AnalyticsOptions(scan_consistency=AnalyticsScanConsistency.REQUEST_PLUS))
Client Context Id
The SDK will always send a client context ID with each query, even if none is provided by the user. By default a UUID will be generated that is mirrored back from the analytics engine and can be used for debugging purposes. A custom string can always be provided if you want to introduce application-specific semantics into it (so that for example in a network dump it shows up with a certain identifier). Whatever is chosen, we recommend making sure it is unique so different queries can be distinguished during debugging or monitoring.
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = 'France'",
AnalyticsOptions(client_context_id="user-44{}".format(uuid.uuid4())))
Priority
By default, every analytics query has the same priority on the server. By setting this boolean flag to true, you are indicating that you need expedited dispatch in the analytice engine for this request.
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = 'France'",
AnalyticsOptions(priority=True))
Readonly
If the query is marked as readonly, both the server and the SDK can improve processing of the operation. On the client side, the SDK can be more liberal with retries because it can be sure that there are no state-mutating side-effects happening. The query engine will ensure that actually no data is mutated when parsing and planning the query.
result = cluster.analytics_query(
"SELECT count(*) FROM airports a WHERE a.country = 'France'",
AnalyticsOptions(read_only=True))
Async APIs
In addition to the blocking API on Cluster
, the SDK provides asyncio and Twisted APIs on ACluster
or TxCluster
respectively.
If you are in doubt of which API to use, we recommend looking at the asyncio API first.
Simple queries with both asyncio and Twisted APIs look similar to the blocking one:
from acouchbase.cluster import Cluster, get_event_loop
from couchbase.options import AnalyticsOptions, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
async def get_couchbase():
cluster = Cluster(
"couchbase://your-ip",
ClusterOptions(PasswordAuthenticator("Administrator", "password")))
bucket = cluster.bucket("travel-sample")
await bucket.on_connect()
collection = bucket.default_collection()
return cluster, bucket, collection
# NOTE: the analytics dataset might need to be created
async def simple_query(cluster):
try:
result = cluster.analytics_query(
"SELECT id, country FROM airports a WHERE a.country = $country LIMIT 10",
AnalyticsOptions(named_parameters={"country": "France"}))
async for row in result:
print("Found row: {}".format(row))
except CouchbaseException as ex:
print(ex)
loop = get_event_loop()
cluster, bucket, collection = loop.run_until_complete(get_couchbase())
loop.run_until_complete(simple_query(cluster))
# **IMPORTANT** need to do this import prior to importing the reactor (new to the Python 4.x SDK)
import txcouchbase
from twisted.internet import reactor
from txcouchbase.cluster import TxCluster
from couchbase.options import AnalyticsOptions, ClusterOptions
from couchbase.auth import PasswordAuthenticator
def handle_query_results(result):
for r in result.rows():
print("query row: {}".format(r))
reactor.stop()
def on_streaming_error(error):
print("Streaming operation had an error.\nError: {}".format(error))
reactor.stop()
# NOTE: the analytics dataset might need to be created
def on_connect_ok(result, cluster):
# create a bucket object
bucket = cluster.bucket("travel-sample")
# create a collection object
cb = bucket.default_collection()
d = cluster.analytics_query(
"SELECT id, country FROM airports a WHERE a.country = $country LIMIT 10",
AnalyticsOptions(named_parameters={"country": "France"}))
d.addCallback(handle_query_results).addErrback(on_streaming_error)
def on_connect_err(error):
print("Unable to connect.\n{}".format(error))
cluster = TxCluster("couchbase://your-ip",
ClusterOptions(PasswordAuthenticator("Administrator", "password")))
# wait for connect
cluster.on_connect().addCallback(on_connect_ok, cluster).addErrback(on_connect_err)
reactor.run()
Scoped Queries on Named Collections
In addition to creating a dataset with a WHERE clause to filter the results to documents with certain characteristics, SDK 3.2 now allows you to create a dataset against a named collection, for example:
ALTER COLLECTION `travel-sample`.inventory.airport ENABLE ANALYTICS;
-- NB: this is more or less equivalent to:
CREATE DATAVERSE `travel-sample`.inventory;
CREATE DATASET `travel-sample`.inventory.airport ON `travel-sample`.inventory.airport;
We can then query the Dataset as normal, using the fully qualified keyspace:
result = cluster.analytics_query(
"SELECT airportname, country FROM `travel-sample`.inventory.airport a WHERE a.country = 'France' LIMIT 3")
Note that using the CREATE DATASET
syntax we could choose any Dataset name in any Dataverse, including the default.
However the SDK supports this standard convention, allowing us to query from the Scope object:
scope = bucket.scope("inventory")
result = scope.analytics_query(
"SELECT airportname, country FROM airport a WHERE a.country = 'France' LIMIT 3")