Goal: Demonstrate accessing a cURL REST end point via GET to fetch Daily Exchange Rate data.
Implementation: Create a JavaScript function that contains an OnUpdate handler with a cURL GET request in a Timer callback function. The handler listens for mutations or data-changes within a specified source bucket. When any document within the bucket is created or modified, the Eventing Function creates a callback in the future to execute a user-defined routine to fetch exchange rates. The initial timer is created just 30 seconds in the future, the next timers 2-N are created at the beginning of the next day. In this example, we rely on a control document which if mutated controls whether a Recurring Timer will be created or canceled.
We will also rely on a public REST API that can be access via a cURL GET. Please test the below on a command line before proceeding to make sure one of the REST endpoints is live.
curl -q -X GET 'https://api.frankfurter.app/latest'
curl -q -X GET 'https://api.ratesapi.io/api/latest'
If both of the above URLs are offline there are pay sites that provide similar information or can roll your own mini server in python.
#!/usr/bin/env python3
from http.server import HTTPServer, BaseHTTPRequestHandler
from http import HTTPStatus
from datetime import datetime
import json
import time
class _RequestHandler(BaseHTTPRequestHandler):
# Borrowing from https://gist.github.com/nitaku/10d0662536f37a087e1b
def _set_headers(self):
self.send_response(HTTPStatus.OK.value)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
def do_GET(self):
message = '{"amount":1.0,"base":"EUR","date":"' + \
datetime.today().strftime('%Y-%m-%d') + \
'","rates":{"AUD":1.5907,"BGN":1.9558,"USD":1.1802}}'
self._set_headers()
self.wfile.write(json.dumps(message).encode('utf-8'))
def run_server():
server_address = ('', 8001)
httpd = HTTPServer(server_address, _RequestHandler)
print('serving at %s:%d' % server_address)
httpd.serve_forever()
if __name__ == '__main__':
run_server()
The Couchbase Eventing Code to perform a cURL request is pretty trivial (below), but in this example the final Eventing function will do a lot more, such as adding a recurring timer to build up a set of documents with the exchange rates for every day.
// ==============================
// Perform a cURL GET here
var request = {
path: apiReqDateUtc
};
// perform the cURL request using the URL alias form the settings
var response = curl('GET', exchangeRateApi, request);
var status = "OKAY";
if (response.status != 200 && response.status != 302) {
status = "FAIL";
}
// ==============================
Although either of the above remote APIs can take on forms that serve up exact dates by substituting the parameter "latest" for an exact date 2020-08-05, we will rely on the "latest" results. On success we expect to see a returned JSON payload as follows:
{
"base": "EUR",
"rates": {
"GBP": 0.90265,
"HKD": 9.1871,
"IDR": 17247.57,
"ILS": 4.0397,
"DKK": 7.4508,
"INR": 88.709,
"CHF": 1.077,
"MXN": 26.7125,
"CZK": 26.097,
"SGD": 1.6228,
"THB": 36.759,
"HRK": 7.4683,
"MYR": 4.9698,
"NOK": 10.6585,
"CNY": 8.2277,
"BGN": 1.9558,
"PHP": 58.12,
"SEK": 10.2865,
"PLN": 4.3935,
"ZAR": 20.4221,
"CAD": 1.5703,
"ISK": 160.2,
"BRL": 6.2311,
"RON": 4.8345,
"NZD": 1.7809,
"TRY": 8.3311,
"JPY": 125.37,
"RUB": 86.3692,
"KRW": 1405.74,
"USD": 1.1854,
"HUF": 344.5,
"AUD": 1.6415
},
"date": "2020-08-05"
}
If you roll your own server using the above python sample (use a real IP for instead of localhost if you are not on the Couchbase server) the returned data will be static but the date will always be current.
curl -q -X GET 'https://localhost:8001/api/latest'
{
"amount": 1.0,
"base": "EUR",
"date": "2021-07-18",
"rates": {
"AUD": 1.5907,
"BGN": 1.9558,
"USD": 1.1802
}
}
We will perform the following tests with the Eventing Function:
-
Test 1: The control document is created or mutated in such a way that a Timer is created, and initially fires approximately 30 seconds in the future. At this point a document is fetched from an External Rest endpoint (representing the user work) and this document is written to the source bucket. The original control document, in the source bucket, is not changed. The Timer is re-armed for the beginning of the following day and will execute again and again until canceled fetching a new Daily Exchange Rate.
-
Test 2: The control document is mutated in such a way that any existing Timer with the reference of the control document’s id (meta.id) is canceled. This has no effect if the Timer created has already fired.
Preparations (Common):
For this example, two (2) buckets 'bulk' and 'rr100' are required where the latter is intended to be 100% resident. Create the buckets with a minimum size of 100MB. For information on buckets, see Create a Bucket. Within the buckets we need three (2) keyspaces 'bulk.data.source' and 'rr100.eventing.metadata' (we loosely follow this organization).
If you run a version of Couchbase prior to 7.0 you can just create the buckets 'source' and 'metadata' and run this example. Furthermore if your cluster was subsequently upgraded from say 6.6.2 to 7.0 your data would be moved to 'source._default._default' and 'metadata._default._default' and your Eventing Function would be seamlessly upgraded to use the new keyspaces and continue to run correctly.
For complete details on how to set up your keyspaces refer to creating buckets and creating scopes and collections.
The Eventing Storage keyspace, in this case 'rr100.eventing.metadata', is for the sole use of the Eventing system, do not add, modify, or delete documents from it. In addition do not drop or flush or delete the containing bucket (or delete this collection) while you have any deployed Eventing functions. In a single tenancy deployment this collection can be shared with other Eventing functions. |
Setup:
-
Access the Couchbase Web Console > Buckets page.
-
You should see the following once you have created your buckets:
-
-
[Optional Step] Verify we have our empty collections:
-
Click the Scopes & Collections link of the bulk bucket (on the right).
-
Click the data scope name to expand the section (on the left).
-
You should see no user records.
-
-
Click the Documents link of the source collection (on the right).
-
Again you should see no user records.
-
Click Add Document in the upper right banner
-
For the ID in the Create New Document dialog specify recurring_timer::1
ID [ recurring_timer::1 ]
-
For the document body in the Create New Document dialog, the following text is displayed:
{ "click": "to edit", "with JSON": "there are no reserved field names" }
-
replace the above text with the following JSON document via a cut-n-paste
{ "type": "recurring_timer", "id": 1, "active": false }
-
Click Save.
-
-
From the Couchbase Web Console > Eventing page, click ADD FUNCTION, to add a new Function. The ADD FUNCTION dialog appears.
-
In the ADD FUNCTION dialog, for individual Function elements provide the below information:
-
For the Listen To Location drop-down, select bulk, data, source as the keyspace.
-
For the Eventing Storage drop-down, select rr100, eventing, metadata as the keyspace.
-
Enter external_rest_via_curl_get as the name of the Function you are creating in the Function Name text-box.
-
Leave the "Deployment Feed Boundary" as Everything.
-
[Optional Step] Enter text "Explore using an external REST endpoint to fetch daily data via a GET operation via a recurring timer. The initial fetch will be 30 seconds in the future the following fetches will be at the start of each subsequent day.", in the Description text-box.
-
For the Settings option, use the default values.
-
For the Bindings option, add two bindings.
-
For the first binding, select "bucket alias", specify src_col as the "alias name" of the collection, select bulk, data, source as the associated keyspace, and select "read and write" for the access mode.
-
For the second binding, select "URL alias", specify exchangeRateApi as the "alias name" of the URL, specify https://api.frankfurter.app/latest for the actual URL, and "no auth" and no "cookies". Note we one be adding "latest" to the end of the URL the Eventing Functions JavaScript will add YYYY-MM-DD for yesterday.
-
-
After configuring your settings the ADD FUNCTION dialog should look like this:
-
-
After providing all the required information in the ADD FUNCTION dialog, click Next: Add Code. The ext_rest_via_curl dialog appears.
-
The ext_rest_via_curl dialog initially contains a placeholder code block. You will substitute your actual ext_rest_via_curl code in this block.
-
Copy the following Function, and paste it in the placeholder code block of external_rest_via_curl_get dialog.
function CreateRecurringTimer(context) { log('From CreateRecurringTimer: creating timer', context.mode, context.id); var nextSchedule = null; if (context.mode === "via_onupdate") { // Create a timestamp 30 seconds from now for the initial Timer var thirtySecFromNow = new Date(); // Get current time & add 30 sec. to it. thirtySecFromNow.setSeconds(thirtySecFromNow.getSeconds() + 30); nextSchedule = thirtySecFromNow; } else { // must be: context.mode === "via_callback" // Create UTC timestamp to fire a Timer for tomorrow do this for timers 2 to N var tomorrow = new Date(); tomorrow.setHours(0,0,0,0); tomorrow.setDate(tomorrow.getDate() + 1); nextSchedule = tomorrow; } log("Finish CreateRecurringTimer (local time) nextSchedule", localISOTime(nextSchedule)); createTimer(RecurringTimerCallback, nextSchedule, context.id, context); } function localISOTime (indate) { // JavaScript works with dates as UTC times, but sometimes we prefer to see local time return new Date(indate.getTime() - indate.getTimezoneOffset() * 60 * 1000) .toISOString().split(/[TZ]/).slice(0, 2).join('T'); } function RecurringTimerCallback(context) { log('From RecurringTimerCallback: timer fired', context); // do any sort of recurring work here, just update a date_stamp in a doc var now = new Date(); var nowLoc = localISOTime(now); var dt_beg = now.getTime(); // Generate a YYYY-MM-DD string in UTC for Yesterday var yesterday = new Date(); yesterday.setHours(0,0,0,0); yesterday.setDate(yesterday.getDate() - 1); var apiReqDateUtc = yesterday.toISOString().substring(0, 10); // Generate a YYYY-MM-DD string in Local Time for Yesterday var apiReqDateLoc = localISOTime(yesterday).substring(0, 10); try { // ============================== // Perform a cURL GET here var request = { path: apiReqDateUtc }; // perform the cURL request using the URL alias form the settings var response = curl('GET', exchangeRateApi, request); var status = "OKAY"; if (response.status != 200 && response.status != 302) { status = "FAIL"; } // ============================== var curl_time_ms = new Date().getTime() - dt_beg; log('USER FUNCTION DONE ' + status + ' (curl ' + response.status + ' took ' + curl_time_ms + ' ms.)'); if (response && response.body && response.body.date && response.body.base) { // write our exchange lookup table document, we will do this 365 times a year src_col["exchange::" + response.body.date] = response.body; // write status doc - we succeeded src_col["cur_" + context.id] = { "last_update_loc": nowLoc, "last_update_utc": now, "apiReqDateUtc": apiReqDateUtc, "curl_success": true, "valid": true, "curl_time_ms": curl_time_ms }; } else { // write status doc - we failed src_col["cur_" + context.id] = { "last_update_loc": nowLoc, "last_update_utc": now, "apiReqDateUtc": apiReqDateUtc, "curl_success": true, "body_valid": false, "curl_time_ms": curl_time_ms }; } } catch (e) { var curl_time_ms = new Date().getTime() - dt_beg; log('USER FUNCTION DONE ' + status + ' (curl ERROR ' + e + ' took ' + curl_time_ms + ' ms.)'); // write status doc - we failed src_col["cur_" + context.id] = { "last_update_loc": nowLoc, "last_update_utc": now, "apiReqDateUtc": apiReqDateUtc, "curl_success": false, "body_valid": false, "curl_time_ms": curl_time_ms }; } // rearm the timer CreateRecurringTimer({ "id": context.id, "mode": "via_callback" }) } function OnUpdate(doc, meta) { // You would typically filter to mutations of interest if (doc.type !== 'recurring_timer') return; if (doc.active === false) { if (cancelTimer(RecurringTimerCallback, meta.id)) { log('From OnUpdate: canceled active Timer, doc.active', doc.active, meta.id); } else { log('From OnUpdate: no active Timer to cancel, doc.active', doc.active, meta.id); } } else { log('From OnUpdate: create/overwrite doc.active', doc.active, meta.id); CreateRecurringTimer({ "id": meta.id, "mode": "via_onupdate" }); } }
After pasting, the screen appears as displayed below:
-
Click Save and Return.
-
-
The OnUpdate routine specifies that when a change occurs to data within the "source" collection, actions will be processed according to the field within the document. First we ignore all documents that do not have a doc.type of "recurring_timer" — this is the control document. Next we use the field "active" to determine which action we take.
-
If "active" is true we will create a series of daily Timers that will fire. However the first timer will be approximately 30 seconds in the future from the time of deployment, for testing purposes.
-
If "active" is false we will cancel the existing Timer if any.
-
In the event a Timer created by this Function fires, the callback RecurringTimerCallback executes and writes a new document with a similar key (but with "cur_" prepended) into the "source" collection.
-
-
From the Eventing screen, click the external_rest_via_curl_get function to select it, then click Deploy.
-
In the Confirm Deploy Function Click Deploy Function.
-
-
The Eventing function is deployed and starts running within a few seconds. From this point, the defined Function is executed on all existing documents, and more importantly it will also run on subsequent mutations.
Test 1: Create a Recurring Timer and allow the Timer to Fire and Rearm
-
Access the Couchbase Web Console > Documents page then select the keyspace
bulk
.data
.source
-
Edit the control document recurring_timer::1 — it should look like this:
{ "type": "recurring_timer", "id": 1, "active": false }
Change "active" to true then click Save. This will create a mutation and the Function will generate the first of a series of recurring Timers. The control document is now:
{ "type": "recurring_timer", "id": 1, "active": true }
-
-
Access the Couchbase Web Console > Eventing page and if necessary select the Function external_rest_via_curl_get, then click the "Log" link for the Deployed Function to view the activity.
-
Here we see from the Application log that we created a timer. Note the log is in reverse order, and the bottom (or first) message was a NOOP, because doc.active was false when we first deployed and we tried to cancel any timer if it was running.
2021-07-18T14:37:25.136-07:00 [INFO] "Finish CreateRecurringTimer (local time) nextSchedule" "2021-07-18T14:37:55.134" 2021-07-18T14:37:25.134-07:00 [INFO] "From OnUpdate: create/overwrite doc.active" true "recurring_timer::1" 2021-07-18T14:37:25.134-07:00 [INFO] "From CreateRecurringTimer: creating timer" "via_onupdate" "recurring_timer::1" 2021-07-18T14:36:55.177-07:00 [INFO] "From OnUpdate: no active Timer to cancel, doc.active" false "recurring_timer::1"
-
-
Close the Function Log dialog, then wait about 2 minutes and click the "Log" link for Deployed Function external_rest_via_curl_get to view the activity again.
-
Here we see the timer fired and executed the callback RecurringTimerCallback near our scheduled time and is re-arming as expected (4 new lines).
2021-07-18T14:37:59.164-07:00 [INFO] "From CreateRecurringTimer: creating timer" "via_callback" "recurring_timer::1" 2021-07-18T14:37:59.164-07:00 [INFO] "Finish CreateRecurringTimer (local time) nextSchedule" "2021-07-19T00:00:00.000" 2021-07-18T14:37:59.161-07:00 [INFO] "USER FUNCTION DONE OKAY (curl 200 took 443 ms.)" 2021-07-18T14:37:58.718-07:00 [INFO] "From RecurringTimerCallback: timer fired" {"id":"recurring_timer::1","mode":"via_onupdate"}
-
-
Close the Function Log dialog, then to check the results of the callback, access the Couchbase Web Console > Documents page then select the keyspace
bulk
.data
.source
-
Edit the new output status document cur_recurring_timer::1 (note the last_update field is in UTC) and you will see the data written by the Timer’s callback:
{ "last_update_loc": "2021-07-18T14:04:06.408", "last_update_utc": "2021-07-18T21:04:06.408Z", "apiReqDateUtc": "2021-07-17", "curl_success": true, "body_valid": false, "curl_time_ms": 442 }
-
Click Cancel to close the editor.
Note, above we have a local time of execution "last_update_loc", the UTC time of execution "last_update_utc", and the request date for the prior day "apiReqDateUtc".
-
Edit the new output status document exchange::YYYY-MM-DD to see the captured exchange rate data, below is date for the key exchange::2021-07-16:
{ "amount": 1, "base": "EUR", "date": "2021-07-16", "rates": { "AUD": 1.5907, "BGN": 1.9558, "BRL": 6.0146, "CAD": 1.4856, "CHF": 1.0853, "CNY": 7.6373, "CZK": 25.538, "DKK": 7.4381, "GBP": 0.85298, "HKD": 9.1684, "HRK": 7.4968, "HUF": 359.73, "IDR": 17083, "ILS": 3.8796, "INR": 88.03, "ISK": 145.9, "JPY": 130.03, "KRW": 1347.94, "MXN": 23.459, "MYR": 4.9681, "NOK": 10.3878, "NZD": 1.6836, "PHP": 59.364, "PLN": 4.5867, "RON": 4.9285, "RUB": 87.52, "SEK": 10.2428, "SGD": 1.5993, "THB": 38.669, "TRY": 10.0521, "USD": 1.1802, "ZAR": 16.984 } }
-
Click Cancel to close the editor.
-
-
[Optional] Wait until the next morning and click the "Log" link for the Deployed Function external_rest_via_curl_get to view the activity. The code triggers the initial cur_recurring_timer::1 in 30 seconds, but for requests 2-N it switches to a daily basis.
-
Here we see the timer fired and executed the callback RecurringTimerCallback near our scheduled time for the next day and re-armed as expected.
2021-07-19T00:00:01.021-07:00 [INFO] "Finish CreateRecurringTimer (local time) nextSchedule" "2021-07-20T00:00:00.000" 2021-07-19T00:00:01.485-07:00 [INFO] "From CreateRecurringTimer: creating timer" "via_callback" "recurring_timer::1" 2021-07-19T00:00:01.487-07:00 [INFO] "USER FUNCTION DONE OKAY (curl 200 took 490 ms.)" 2021-07-19T00:00:00.003-07:00 [INFO] "From RecurringTimerCallback: timer fired" {"id":"recurring_timer::1","mode":"via_callback"}
[Optional] Wait until the next morning and click the "Log" link for the Deployed Function external_rest_via_curl_get to view the activity. Close the Function Log dialog, then to check the results of the callback, . Access the Couchbase Web Console > Documents page then select the keyspace
bulk
.data
.source
-
Edit the new output status document cur_recurring_timer::1 (note the last_update field is in UTC — we are 7 hours behind UTC) and you will see the data written by the Timer’s callback:
{ "last_update_loc": "2021-07-19T00:01.021", "last_update_utc": "2021-07-19T07:01.021", "apiReqDateUtc": "2021-07-18", "curl_success": true, "valid": true, "curl_time_ms": 490 }
-
Click Cancel to close the editor.
-
Test 2: Cancel the Recurring Timer
-
Access the Couchbase Web Console > Documents page then select the keyspace
bulk
.data
.source
-
Edit the control document recurring_timer::1 — it should look like this:
{ "type": "recurring_timer", "id": 1, "active": true }
Now change "a_number" to 2 to create a mutation, then click Save. The control document is now:
{ "type": "recurring_timer", "id": 1, "active": false }
-
-
Access the Couchbase Web Console > Eventing page and if necessary select the Function external_rest_via_curl_get, then click the "Log" link for the Deployed Function to view the activity.
-
Here we see from the Application log that we canceled the sequence — the recurring timer has stopped.
2021-07-19T09:09:29.639-07:00 [INFO] "From OnUpdate: canceled active Timer, doc.active" false "recurring_timer::1"
-
Cleanup:
Go to the Eventing portion of the UI and undeploy the Function cancel_overwrite_timer, this will remove the 1280 documents (2048 prior to 7.0.0) for the function from the 'rr100.eventing.metadata' collection (in the Bucket view of the UI). Remember you may only delete the 'rr100.eventing.metadata' keyspace if there are no deployed Eventing Functions.
Now flush the 'bulk' bucket if you plan to run other examples (you may need to Edit the bucket 'bulk' and enable the flush capability).