Hello Columnar — Python SDK Quickstart Guide
Install, connect, try. A quick start guide to get you up and running with Columnar and the Python Columnar SDK.
Capella Columnar is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Columnar SDK is developed from the ground-up for Columnar’s analytical use cases, and supports streaming APIs to handle large datasets.
Before You Start
Sign up for a Capella account, and choose a Columnar cluster.
You’ll need to add your IP address to the allowlist, during the sign-up and cluster creation process (this can also be done at any time, via the UI, should the address change, or if you need to add a new one).
Prerequisites
Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.
Getting the SDK
The SDK can be installed via pip
:
python -m pip install couchbase-columnar
For other installation methods, see the installation page.
Connecting and Executing a Query
Synchronous API
from couchbase_columnar.cluster import Cluster
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import (ClusterOptions,
QueryOptions,
SecurityOptions)
def main() -> None:
# Update this to your cluster
connstr = 'couchbases://--your-instance--'
username = 'username'
pw = 'Password!123'
# User Input ends here.
cred = Credential.from_username_and_password(username, pw)
cluster = Cluster.create_instance(connstr, cred)
# Execute a query and buffer all result rows in client memory.
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
res = cluster.execute_query(statement)
all_rows = res.get_all_rows()
for row in all_rows:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a query and process rows as they arrive from server.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
res = cluster.execute_query(statement)
for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with positional arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
for row in res:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with named arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
'limit': 10}))
for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
if __name__ == '__main__':
main()
Asynchronous (asyncio) API
from acouchbase_columnar import get_event_loop
from acouchbase_columnar.cluster import AsyncCluster
from couchbase_columnar.credential import Credential
from couchbase_columnar.options import (ClusterOptions,
QueryOptions,
SecurityOptions)
async def main() -> None:
# Update this to your cluster
connstr = 'couchbases://--your-instance--'
username = 'username'
pw = 'Password!123'
# User Input ends here.
cred = Credential.from_username_and_password(username, pw)
cluster = AsyncCluster.create_instance(connstr, cred)
# Execute a query and buffer all result rows in client memory.
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
res = await cluster.execute_query(statement)
all_rows = await res.get_all_rows()
# NOTE: all_rows is a list, _do not_ use `async for`
for row in all_rows:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a query and process rows as they arrive from server.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
res = await cluster.execute_query(statement)
async for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with positional arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
async for row in res:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with named arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
'limit': 10}))
async for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
if __name__ == '__main__':
loop = get_event_loop()
loop.run_until_complete(main())