diff --git a/distributor/Ballerina.toml b/distributor/Ballerina.toml index 64099f8..1968901 100644 --- a/distributor/Ballerina.toml +++ b/distributor/Ballerina.toml @@ -3,7 +3,7 @@ org-name="eclk" version="0.1.0" [dependencies] -"maryamzi/websub.hub.mysqlstore" = "0.1.2" +"maryamzi/websub.hub.mysqlstore" = "0.2.0" [platform] target = "java8" diff --git a/distributor/src/distributor/listeners.bal b/distributor/src/distributor/listeners.bal index 2a5c36b..12ecfd8 100644 --- a/distributor/src/distributor/listeners.bal +++ b/distributor/src/distributor/listeners.bal @@ -1,8 +1,17 @@ +import ballerina/auth; import ballerina/config; import ballerina/http; # Listener for results tabulation to deliver results to us. listener http:Listener resultsListener = new (config:getAsInt("eclk.pub.port", 8181)); +http:BasicAuthHandler inboundBasicAuthHandler = new(new auth:InboundBasicAuthProvider()); + # Listener for media orgs to subscribe, for the website and for them to pull specific results. -listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090)); +listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090), config = { + auth: { + authHandlers: [inboundBasicAuthHandler], + mandateSecureSocket: false + }, + filters: [new SubscriptionFilter()] +}); diff --git a/distributor/src/distributor/main.bal b/distributor/src/distributor/main.bal index ee4ba2f..4a461f8 100644 --- a/distributor/src/distributor/main.bal +++ b/distributor/src/distributor/main.bal @@ -4,7 +4,7 @@ import ballerinax/java.jdbc; import maryamzi/websub.hub.mysqlstore; -websub:WebSubHub? hub = (); +websub:Hub? hub = (); public function main() returns error? { // create database connection to persist subscribers @@ -28,22 +28,34 @@ public function main() returns error? { // start the hub var hubStartUpResult = - websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler - { - hubPersistenceStore: persistenceStore, - clientConfig: { - // TODO: finalize - retryConfig: { - count: 3, - intervalInMillis: 5000 + check websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler + "/websub", "/hub", + serviceAuth = { + enabled: true }, - //followRedirects: { - // enabled: true, - // maxCount: 5 - //}, - timeoutInMillis: 5*60000 // Check - } - }); + publisherResourceAuth = { + enabled: true, + scopes: ["publish"] + }, + subscriptionResourceAuth = { + enabled: true, + scopes: ["subscribe"] + }, + hubConfiguration = { + hubPersistenceStore: persistenceStore, + clientConfig: { + // TODO: finalize + retryConfig: { + count: 3, + intervalInMillis: 5000 + }, + //followRedirects: { + // enabled: true, + // maxCount: 5 + //}, + timeoutInMillis: 5*60000 // Check + } + }); if hubStartUpResult is websub:HubStartedUpError { return error(ERROR_REASON, message = hubStartUpResult.message); diff --git a/distributor/src/distributor/results.bal b/distributor/src/distributor/results.bal index ec6f3f5..df568b7 100644 --- a/distributor/src/distributor/results.bal +++ b/distributor/src/distributor/results.bal @@ -89,7 +89,7 @@ function publishResultData(Result result) { } worker jsonWorker returns error? { - websub:WebSubHub wh = hub; // safe .. working around type guard limitation + websub:Hub wh = hub; // safe .. working around type guard limitation // push it out with the election code and the json result as the message json resultAll = { diff --git a/distributor/src/distributor/save.bal b/distributor/src/distributor/save.bal index 2a3d2b8..4a039bb 100644 --- a/distributor/src/distributor/save.bal +++ b/distributor/src/distributor/save.bal @@ -28,6 +28,15 @@ const UPDATE_RESULT_IMAGE = "UPDATE results SET imageMediaType = ?, imageData = const SELECT_RESULTS_DATA = "SELECT sequenceNo, election, code, type, jsonResult, imageMediaType, imageData FROM results"; const DROP_RESULTS_TABLE = "DROP TABLE results"; +const string CREATE_CALLBACKS_TABLE = "CREATE TABLE IF NOT EXISTS callbacks (" + + " username VARCHAR(100) NOT NULL," + + " callback VARCHAR(200) NOT NULL," + + " PRIMARY KEY (username, callback))"; +const INSERT_CALLBACK = "INSERT INTO callbacks (username, callback) VALUES (?, ?)"; +const UPDATE_CALLBACK = "UPDATE callbacks SET callback = ? WHERE username = ?"; +const SELECT_CALLBACKS = "SELECT * FROM callbacks"; +const DROP_CALLBACKS_TABLE = "DROP TABLE callbacks"; + jdbc:Client dbClient = new ({ url: config:getAsString("eclk.hub.db.url"), username: config:getAsString("eclk.hub.db.username"), @@ -47,11 +56,17 @@ type DataResult record {| byte[]? imageData; |}; +type UserCallback record {| + string username; + string callback; +|}; + # Create database and set up at module init time and load any data in there to # memory for the website to show. Panic if there's any issue. function __init() { // create tables _ = checkpanic dbClient->update(CREATE_RESULTS_TABLE); + _ = checkpanic dbClient->update(CREATE_CALLBACKS_TABLE); // load any results in there to our cache - the order will match the autoincrement and will be the sequence # table ret = checkpanic dbClient->select(SELECT_RESULTS_DATA, DataResult); @@ -78,6 +93,18 @@ function __init() { if (count > 0) { log:printInfo("Loaded " + count.toString() + " previous results from database"); } + + // load username-callback data for already added subscriptions + table callbackRet = checkpanic dbClient->select(SELECT_CALLBACKS, UserCallback); + count = 0; + while (callbackRet.hasNext()) { + UserCallback userCb = callbackRet.getNext(); + callbackMap[userCb.username] = <@untainted> userCb.callback; + count += 1; + } + if (count > 0) { + log:printInfo("Loaded " + count.toString() + " registered callback(s) from database"); + } } # Save an incoming result to make sure we don't lose it after getting it @@ -130,9 +157,26 @@ function saveImage(string electionCode, string resultCode, string mediaType, byt } } +# Save a subscription username-calback combination. +function saveUserCallback(string username, string callback) { + var r = dbClient->update(INSERT_CALLBACK, username, callback); + if r is error { + log:printError("Unable to save username-callback in database: ", r); + } +} + +# Update a subscription username-calback combination. +function updateUserCallback(string username, string callback) { + var r = dbClient->update(UPDATE_CALLBACK, callback, username); + if r is error { + log:printError("Unable to update username-callback in database: ", r); + } +} + # Clean everything from the DB and the in-memory cache # + return - error if something goes wrong function resetResults() returns error? { _ = check dbClient->update(DROP_RESULTS_TABLE); + _ = check dbClient->update(DROP_CALLBACKS_TABLE); __init(); } diff --git a/distributor/src/distributor/subscription_filter.bal b/distributor/src/distributor/subscription_filter.bal new file mode 100644 index 0000000..180e536 --- /dev/null +++ b/distributor/src/distributor/subscription_filter.bal @@ -0,0 +1,87 @@ +import ballerina/auth; +import ballerina/encoding; +import ballerina/http; +import ballerina/log; +import ballerina/websub; + +const HUB_TOPIC = "hub.topic"; +const HUB_CALLBACK = "hub.callback"; + +map callbackMap = {}; + +public type SubscriptionFilter object { + *http:RequestFilter; + + public function filterRequest(http:Caller caller, http:Request request, http:FilterContext context) returns boolean { + map|error params = request.getFormParams(); + + if params is error { + log:printError("error extracting form params", params); + return false; + } + + map paramMap = > params; + if !paramMap.hasKey(HUB_TOPIC) || !paramMap.hasKey(HUB_CALLBACK) { + log:printError("topic and/or callback not available"); + return false; + } + + string topic = paramMap.get(HUB_TOPIC); + string callback = paramMap.get(HUB_CALLBACK); + + string|error decodedTopic = encoding:decodeUriComponent(topic, "UTF-8"); + if (decodedTopic is string) { + topic = decodedTopic; + } else { + log:printWarn("error decoding topic, using the original form: " + topic + ". Error: " + decodedTopic.toString()); + } + + if (topic != JSON_RESULTS_TOPIC) { + log:printError("subscription request received for invalid topic " + topic); + return false; + } + + string|error decodedCallback = encoding:decodeUriComponent(callback, "UTF-8"); + if (decodedCallback is string) { + callback = decodedCallback; + } else { + log:printWarn("error decoding callback, using the original form: " + callback + ". Error: " + decodedCallback.toString()); + } + + websub:Hub hubVar = hub; + + if (!request.hasHeader(http:AUTH_HEADER)) { + return false; + } + + string headerValue = request.getHeader(http:AUTH_HEADER); + + if !(headerValue.startsWith(auth:AUTH_SCHEME_BASIC)) { + return false; + } + + string credential = headerValue.substring(5, headerValue.length()).trim(); + + var result = auth:extractUsernameAndPassword(credential); + + if (result is [string, string]) { + [string, string][username, _] = result; + + if callbackMap.hasKey(username) { + error? remResult = hubVar.removeSubscription(topic, callbackMap.get(username)); + log:printInfo("Removing existing subscription for username: " + username); + if (remResult is error) { + log:printError("error removing existing subscription for username: " + username, remResult); + } + updateUserCallback(username, callback); + } else { + saveUserCallback(username, callback); + } + callbackMap[username] = <@untainted> callback; + } else { + log:printError("Error extracting credentials", result); + return false; + } + return true; + } +}; diff --git a/subscriber/src/subscriber/subscriber.bal b/subscriber/src/subscriber/subscriber.bal index fd1c71f..b042fba 100644 --- a/subscriber/src/subscriber/subscriber.bal +++ b/subscriber/src/subscriber/subscriber.bal @@ -1,7 +1,8 @@ -import ballerina/log; -import ballerina/websub; +import ballerina/auth; import ballerina/http; import ballerina/io; +import ballerina/log; +import ballerina/websub; const MY_VERSION = "2019-11-02"; @@ -32,8 +33,12 @@ int subscriberPort = -1; boolean wantJson = false; boolean wantXml = false; +http:OutboundAuthConfig? auth = (); + // what formats does the user want results saved in? public function main (string secret, // secret to send to the hub + string? username = (), // my username + string? password = (), // my password boolean 'json = false, // do I want json? boolean 'xml = false, // do I want xml? string homeURL = "https://resultstest.ecdev.opensource.lk", // where do I subscribe at @@ -71,13 +76,25 @@ public function main (string secret, // secret to send to the hub // check whether this version is still supported hr = check hc->get("/isactive/" + MY_VERSION); if hr.statusCode != 200 { - io:println("*** This version of the subscriber is no longer supported!"); - return; + return error("*** This version of the subscriber is no longer supported!"); } // start the listener websub:Listener websubListener = new(subscriberPort); + if (username is string && password is string) { + auth:OutboundBasicAuthProvider outboundBasicAuthProvider = new ({ + username: <@untainted> username, + password: <@untainted> password + }); + + http:BasicAuthHandler outboundBasicAuthHandler = + new ( outboundBasicAuthProvider); + auth = { + authHandler: outboundBasicAuthHandler + }; + } + // attach JSON subscriber subscriberService = @websub:SubscriberServiceConfig { path: JSON_PATH, @@ -85,7 +102,10 @@ public function main (string secret, // secret to send to the hub target: [hub, JSON_TOPIC], leaseSeconds: TWO_DAYS_IN_SECONDS, secret: subscriberSecret, - callback: subscriberPublicUrl.concat(JSON_PATH) + callback: subscriberPublicUrl.concat(JSON_PATH), + hubClientConfig: { + auth: auth + } } service { resource function onNotification(websub:Notification notification) {