Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ECLK/Results-Dist
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjiva committed Nov 7, 2019
2 parents 37e7eef + f43acd8 commit d2d7d0d
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 24 deletions.
2 changes: 1 addition & 1 deletion distributor/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ org-name="eclk"
version="0.1.0"

[dependencies]
"maryamzi/websub.hub.mysqlstore" = "0.1.2"
"maryamzi/websub.hub.mysqlstore" = "0.2.0"

[platform]
target = "java8"
Expand Down
11 changes: 10 additions & 1 deletion distributor/src/distributor/listeners.bal
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import ballerina/auth;
import ballerina/config;
import ballerina/http;

# Listener for results tabulation to deliver results to us.
listener http:Listener resultsListener = new (config:getAsInt("eclk.pub.port", 8181));

http:BasicAuthHandler inboundBasicAuthHandler = new(new auth:InboundBasicAuthProvider());

# Listener for media orgs to subscribe, for the website and for them to pull specific results.
listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090));
listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090), config = {
auth: {
authHandlers: [inboundBasicAuthHandler],
mandateSecureSocket: false
},
filters: [new SubscriptionFilter()]
});
44 changes: 28 additions & 16 deletions distributor/src/distributor/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import ballerinax/java.jdbc;

import maryamzi/websub.hub.mysqlstore;

websub:WebSubHub? hub = ();
websub:Hub? hub = ();

public function main() returns error? {
// create database connection to persist subscribers
Expand All @@ -28,22 +28,34 @@ public function main() returns error? {

// start the hub
var hubStartUpResult =
websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler
{
hubPersistenceStore: persistenceStore,
clientConfig: {
// TODO: finalize
retryConfig: {
count: 3,
intervalInMillis: 5000
check websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler
"/websub", "/hub",
serviceAuth = {
enabled: true
},
//followRedirects: {
// enabled: true,
// maxCount: 5
//},
timeoutInMillis: 5*60000 // Check
}
});
publisherResourceAuth = {
enabled: true,
scopes: ["publish"]
},
subscriptionResourceAuth = {
enabled: true,
scopes: ["subscribe"]
},
hubConfiguration = {
hubPersistenceStore: persistenceStore,
clientConfig: {
// TODO: finalize
retryConfig: {
count: 3,
intervalInMillis: 5000
},
//followRedirects: {
// enabled: true,
// maxCount: 5
//},
timeoutInMillis: 5*60000 // Check
}
});

if hubStartUpResult is websub:HubStartedUpError {
return error(ERROR_REASON, message = hubStartUpResult.message);
Expand Down
2 changes: 1 addition & 1 deletion distributor/src/distributor/results.bal
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function publishResultData(Result result) {
}

worker jsonWorker returns error? {
websub:WebSubHub wh = <websub:WebSubHub> hub; // safe .. working around type guard limitation
websub:Hub wh = <websub:Hub> hub; // safe .. working around type guard limitation

// push it out with the election code and the json result as the message
json resultAll = {
Expand Down
44 changes: 44 additions & 0 deletions distributor/src/distributor/save.bal
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ const UPDATE_RESULT_IMAGE = "UPDATE results SET imageMediaType = ?, imageData =
const SELECT_RESULTS_DATA = "SELECT sequenceNo, election, code, type, jsonResult, imageMediaType, imageData FROM results";
const DROP_RESULTS_TABLE = "DROP TABLE results";

const string CREATE_CALLBACKS_TABLE = "CREATE TABLE IF NOT EXISTS callbacks (" +
" username VARCHAR(100) NOT NULL," +
" callback VARCHAR(200) NOT NULL," +
" PRIMARY KEY (username, callback))";
const INSERT_CALLBACK = "INSERT INTO callbacks (username, callback) VALUES (?, ?)";
const UPDATE_CALLBACK = "UPDATE callbacks SET callback = ? WHERE username = ?";
const SELECT_CALLBACKS = "SELECT * FROM callbacks";
const DROP_CALLBACKS_TABLE = "DROP TABLE callbacks";

jdbc:Client dbClient = new ({
url: config:getAsString("eclk.hub.db.url"),
username: config:getAsString("eclk.hub.db.username"),
Expand All @@ -47,11 +56,17 @@ type DataResult record {|
byte[]? imageData;
|};

type UserCallback record {|
string username;
string callback;
|};

# Create database and set up at module init time and load any data in there to
# memory for the website to show. Panic if there's any issue.
function __init() {
// create tables
_ = checkpanic dbClient->update(CREATE_RESULTS_TABLE);
_ = checkpanic dbClient->update(CREATE_CALLBACKS_TABLE);

// load any results in there to our cache - the order will match the autoincrement and will be the sequence #
table<DataResult> ret = checkpanic dbClient->select(SELECT_RESULTS_DATA, DataResult);
Expand All @@ -78,6 +93,18 @@ function __init() {
if (count > 0) {
log:printInfo("Loaded " + count.toString() + " previous results from database");
}

// load username-callback data for already added subscriptions
table<UserCallback> callbackRet = checkpanic dbClient->select(SELECT_CALLBACKS, UserCallback);
count = 0;
while (callbackRet.hasNext()) {
UserCallback userCb = <UserCallback> callbackRet.getNext();
callbackMap[userCb.username] = <@untainted> userCb.callback;
count += 1;
}
if (count > 0) {
log:printInfo("Loaded " + count.toString() + " registered callback(s) from database");
}
}

# Save an incoming result to make sure we don't lose it after getting it
Expand Down Expand Up @@ -130,9 +157,26 @@ function saveImage(string electionCode, string resultCode, string mediaType, byt
}
}

# Save a subscription username-calback combination.
function saveUserCallback(string username, string callback) {
var r = dbClient->update(INSERT_CALLBACK, username, callback);
if r is error {
log:printError("Unable to save username-callback in database: ", r);
}
}

# Update a subscription username-calback combination.
function updateUserCallback(string username, string callback) {
var r = dbClient->update(UPDATE_CALLBACK, callback, username);
if r is error {
log:printError("Unable to update username-callback in database: ", r);
}
}

# Clean everything from the DB and the in-memory cache
# + return - error if something goes wrong
function resetResults() returns error? {
_ = check dbClient->update(DROP_RESULTS_TABLE);
_ = check dbClient->update(DROP_CALLBACKS_TABLE);
__init();
}
87 changes: 87 additions & 0 deletions distributor/src/distributor/subscription_filter.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import ballerina/auth;
import ballerina/encoding;
import ballerina/http;
import ballerina/log;
import ballerina/websub;

const HUB_TOPIC = "hub.topic";
const HUB_CALLBACK = "hub.callback";

map<string> callbackMap = {};

public type SubscriptionFilter object {
*http:RequestFilter;

public function filterRequest(http:Caller caller, http:Request request, http:FilterContext context) returns boolean {
map<string>|error params = request.getFormParams();

if params is error {
log:printError("error extracting form params", params);
return false;
}

map<string> paramMap = <map<string>> params;
if !paramMap.hasKey(HUB_TOPIC) || !paramMap.hasKey(HUB_CALLBACK) {
log:printError("topic and/or callback not available");
return false;
}

string topic = paramMap.get(HUB_TOPIC);
string callback = paramMap.get(HUB_CALLBACK);

string|error decodedTopic = encoding:decodeUriComponent(topic, "UTF-8");
if (decodedTopic is string) {
topic = decodedTopic;
} else {
log:printWarn("error decoding topic, using the original form: " + topic + ". Error: " + decodedTopic.toString());
}

if (topic != JSON_RESULTS_TOPIC) {
log:printError("subscription request received for invalid topic " + topic);
return false;
}

string|error decodedCallback = encoding:decodeUriComponent(callback, "UTF-8");
if (decodedCallback is string) {
callback = decodedCallback;
} else {
log:printWarn("error decoding callback, using the original form: " + callback + ". Error: " + decodedCallback.toString());
}

websub:Hub hubVar = <websub:Hub> hub;

if (!request.hasHeader(http:AUTH_HEADER)) {
return false;
}

string headerValue = request.getHeader(http:AUTH_HEADER);

if !(headerValue.startsWith(auth:AUTH_SCHEME_BASIC)) {
return false;
}

string credential = headerValue.substring(5, headerValue.length()).trim();

var result = auth:extractUsernameAndPassword(credential);

if (result is [string, string]) {
[string, string][username, _] = result;

if callbackMap.hasKey(username) {
error? remResult = hubVar.removeSubscription(topic, callbackMap.get(username));
log:printInfo("Removing existing subscription for username: " + username);
if (remResult is error) {
log:printError("error removing existing subscription for username: " + username, remResult);
}
updateUserCallback(username, callback);
} else {
saveUserCallback(username, callback);
}
callbackMap[username] = <@untainted> callback;
} else {
log:printError("Error extracting credentials", result);
return false;
}
return true;
}
};
30 changes: 25 additions & 5 deletions subscriber/src/subscriber/subscriber.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import ballerina/log;
import ballerina/websub;
import ballerina/auth;
import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/websub;

const MY_VERSION = "2019-11-02";

Expand Down Expand Up @@ -32,8 +33,12 @@ int subscriberPort = -1;
boolean wantJson = false;
boolean wantXml = false;

http:OutboundAuthConfig? auth = ();

// what formats does the user want results saved in?
public function main (string secret, // secret to send to the hub
string? username = (), // my username
string? password = (), // my password
boolean 'json = false, // do I want json?
boolean 'xml = false, // do I want xml?
string homeURL = "https://resultstest.ecdev.opensource.lk", // where do I subscribe at
Expand Down Expand Up @@ -71,21 +76,36 @@ public function main (string secret, // secret to send to the hub
// check whether this version is still supported
hr = check hc->get("/isactive/" + MY_VERSION);
if hr.statusCode != 200 {
io:println("*** This version of the subscriber is no longer supported!");
return;
return error("*** This version of the subscriber is no longer supported!");
}

// start the listener
websub:Listener websubListener = new(subscriberPort);

if (username is string && password is string) {
auth:OutboundBasicAuthProvider outboundBasicAuthProvider = new ({
username: <@untainted> username,
password: <@untainted> password
});

http:BasicAuthHandler outboundBasicAuthHandler =
new (<auth:OutboundBasicAuthProvider> outboundBasicAuthProvider);
auth = {
authHandler: outboundBasicAuthHandler
};
}

// attach JSON subscriber
subscriberService = @websub:SubscriberServiceConfig {
path: JSON_PATH,
subscribeOnStartUp: true,
target: [hub, JSON_TOPIC],
leaseSeconds: TWO_DAYS_IN_SECONDS,
secret: subscriberSecret,
callback: subscriberPublicUrl.concat(JSON_PATH)
callback: subscriberPublicUrl.concat(JSON_PATH),
hubClientConfig: {
auth: auth
}
}
service {
resource function onNotification(websub:Notification notification) {
Expand Down

0 comments on commit d2d7d0d

Please sign in to comment.