Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to b7a 1.0.3 and introduce basic auth for the hub service #70

Merged
merged 9 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributor/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ org-name="eclk"
version="0.1.0"

[dependencies]
"maryamzi/websub.hub.mysqlstore" = "0.1.2"
"maryamzi/websub.hub.mysqlstore" = "0.2.0"

[platform]
target = "java8"
Expand Down
11 changes: 10 additions & 1 deletion distributor/src/distributor/listeners.bal
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import ballerina/auth;
import ballerina/config;
import ballerina/http;

# Listener for results tabulation to deliver results to us.
listener http:Listener resultsListener = new (config:getAsInt("eclk.pub.port", 8181));

http:BasicAuthHandler inboundBasicAuthHandler = new(new auth:InboundBasicAuthProvider());

# Listener for media orgs to subscribe, for the website and for them to pull specific results.
listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090));
listener http:Listener mediaListener = new (config:getAsInt("eclk.hub.port", 9090), config = {
auth: {
authHandlers: [inboundBasicAuthHandler],
mandateSecureSocket: false
},
filters: [new SubscriptionFilter()]
});
44 changes: 28 additions & 16 deletions distributor/src/distributor/main.bal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import ballerinax/java.jdbc;

import maryamzi/websub.hub.mysqlstore;

websub:WebSubHub? hub = ();
websub:Hub? hub = ();

public function main() returns error? {
// create database connection to persist subscribers
Expand All @@ -28,22 +28,34 @@ public function main() returns error? {

// start the hub
var hubStartUpResult =
websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler
{
hubPersistenceStore: persistenceStore,
clientConfig: {
// TODO: finalize
retryConfig: {
count: 3,
intervalInMillis: 5000
check websub:startHub(<@untainted> mediaListener, // weird BUG in ballerina compiler
"/websub", "/hub",
serviceAuth = {
enabled: true
},
//followRedirects: {
// enabled: true,
// maxCount: 5
//},
timeoutInMillis: 5*60000 // Check
}
});
publisherResourceAuth = {
enabled: true,
scopes: ["publish"]
},
subscriptionResourceAuth = {
enabled: true,
scopes: ["subscribe"]
},
hubConfiguration = {
hubPersistenceStore: persistenceStore,
clientConfig: {
// TODO: finalize
retryConfig: {
count: 3,
intervalInMillis: 5000
},
//followRedirects: {
// enabled: true,
// maxCount: 5
//},
timeoutInMillis: 5*60000 // Check
}
});

if hubStartUpResult is websub:HubStartedUpError {
return error(ERROR_REASON, message = hubStartUpResult.message);
Expand Down
2 changes: 1 addition & 1 deletion distributor/src/distributor/results.bal
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function publishResultData(Result result) {
}

worker jsonWorker returns error? {
websub:WebSubHub wh = <websub:WebSubHub> hub; // safe .. working around type guard limitation
websub:Hub wh = <websub:Hub> hub; // safe .. working around type guard limitation

// push it out with the election code and the json result as the message
json resultAll = {
Expand Down
44 changes: 44 additions & 0 deletions distributor/src/distributor/save.bal
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ const UPDATE_RESULT_IMAGE = "UPDATE results SET imageMediaType = ?, imageData =
const SELECT_RESULTS_DATA = "SELECT sequenceNo, election, code, type, jsonResult, imageMediaType, imageData FROM results";
const DROP_RESULTS_TABLE = "DROP TABLE results";

const string CREATE_CALLBACKS_TABLE = "CREATE TABLE IF NOT EXISTS callbacks (" +
" username VARCHAR(100) NOT NULL," +
" callback VARCHAR(200) NOT NULL," +
" PRIMARY KEY (username, callback))";
const INSERT_CALLBACK = "INSERT INTO callbacks (username, callback) VALUES (?, ?)";
const UPDATE_CALLBACK = "UPDATE callbacks SET callback = ? WHERE username = ?";
const SELECT_CALLBACKS = "SELECT * FROM callbacks";
const DROP_CALLBACKS_TABLE = "DROP TABLE callbacks";

jdbc:Client dbClient = new ({
url: config:getAsString("eclk.hub.db.url"),
username: config:getAsString("eclk.hub.db.username"),
Expand All @@ -47,11 +56,17 @@ type DataResult record {|
byte[]? imageData;
|};

type UserCallback record {|
string username;
string callback;
|};

# Create database and set up at module init time and load any data in there to
# memory for the website to show. Panic if there's any issue.
function __init() {
// create tables
_ = checkpanic dbClient->update(CREATE_RESULTS_TABLE);
_ = checkpanic dbClient->update(CREATE_CALLBACKS_TABLE);

// load any results in there to our cache - the order will match the autoincrement and will be the sequence #
table<DataResult> ret = checkpanic dbClient->select(SELECT_RESULTS_DATA, DataResult);
Expand All @@ -78,6 +93,18 @@ function __init() {
if (count > 0) {
log:printInfo("Loaded " + count.toString() + " previous results from database");
}

// load username-callback data for already added subscriptions
table<UserCallback> callbackRet = checkpanic dbClient->select(SELECT_CALLBACKS, UserCallback);
count = 0;
while (callbackRet.hasNext()) {
UserCallback userCb = <UserCallback> callbackRet.getNext();
callbackMap[userCb.username] = <@untainted> userCb.callback;
count += 1;
}
if (count > 0) {
log:printInfo("Loaded " + count.toString() + " registered callback(s) from database");
}
}

# Save an incoming result to make sure we don't lose it after getting it
Expand Down Expand Up @@ -130,9 +157,26 @@ function saveImage(string electionCode, string resultCode, string mediaType, byt
}
}

# Save a subscription username-calback combination.
function saveUserCallback(string username, string callback) {
var r = dbClient->update(INSERT_CALLBACK, username, callback);
if r is error {
log:printError("Unable to save username-callback in database: ", r);
}
}

# Update a subscription username-calback combination.
function updateUserCallback(string username, string callback) {
var r = dbClient->update(UPDATE_CALLBACK, callback, username);
if r is error {
log:printError("Unable to update username-callback in database: ", r);
}
}

# Clean everything from the DB and the in-memory cache
# + return - error if something goes wrong
function resetResults() returns error? {
_ = check dbClient->update(DROP_RESULTS_TABLE);
_ = check dbClient->update(DROP_CALLBACKS_TABLE);
__init();
}
87 changes: 87 additions & 0 deletions distributor/src/distributor/subscription_filter.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import ballerina/auth;
import ballerina/encoding;
import ballerina/http;
import ballerina/log;
import ballerina/websub;

const HUB_TOPIC = "hub.topic";
const HUB_CALLBACK = "hub.callback";

map<string> callbackMap = {};

public type SubscriptionFilter object {
*http:RequestFilter;

public function filterRequest(http:Caller caller, http:Request request, http:FilterContext context) returns boolean {
map<string>|error params = request.getFormParams();

if params is error {
log:printError("error extracting form params", params);
return false;
}

map<string> paramMap = <map<string>> params;
if !paramMap.hasKey(HUB_TOPIC) || !paramMap.hasKey(HUB_CALLBACK) {
log:printError("topic and/or callback not available");
return false;
}

string topic = paramMap.get(HUB_TOPIC);
string callback = paramMap.get(HUB_CALLBACK);

string|error decodedTopic = encoding:decodeUriComponent(topic, "UTF-8");
if (decodedTopic is string) {
topic = decodedTopic;
} else {
log:printWarn("error decoding topic, using the original form: " + topic + ". Error: " + decodedTopic.toString());
}

if (topic != JSON_RESULTS_TOPIC) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to support image sending too .. so user has to subscribe to both the JSON and the image.

See: #48

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fix this after merging this code.

log:printError("subscription request received for invalid topic " + topic);
return false;
}

string|error decodedCallback = encoding:decodeUriComponent(callback, "UTF-8");
if (decodedCallback is string) {
callback = decodedCallback;
} else {
log:printWarn("error decoding callback, using the original form: " + callback + ". Error: " + decodedCallback.toString());
}

websub:Hub hubVar = <websub:Hub> hub;

if (!request.hasHeader(http:AUTH_HEADER)) {
return false;
}

string headerValue = request.getHeader(http:AUTH_HEADER);

if !(headerValue.startsWith(auth:AUTH_SCHEME_BASIC)) {
return false;
}

string credential = headerValue.substring(5, headerValue.length()).trim();

var result = auth:extractUsernameAndPassword(credential);

if (result is [string, string]) {
[string, string][username, _] = result;

if callbackMap.hasKey(username) {
error? remResult = hubVar.removeSubscription(topic, callbackMap.get(username));
log:printInfo("Removing existing subscription for username: " + username);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show the callback URL

if (remResult is error) {
log:printError("error removing existing subscription for username: " + username, remResult);
}
updateUserCallback(username, callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log showing that we added a new subscription and show the callback URL

} else {
saveUserCallback(username, callback);
}
callbackMap[username] = <@untainted> callback;
} else {
log:printError("Error extracting credentials", result);
return false;
}
return true;
}
};
30 changes: 25 additions & 5 deletions subscriber/src/subscriber/subscriber.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import ballerina/log;
import ballerina/websub;
import ballerina/auth;
import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/websub;

const MY_VERSION = "2019-11-02";

Expand Down Expand Up @@ -32,8 +33,12 @@ int subscriberPort = -1;
boolean wantJson = false;
boolean wantXml = false;

http:OutboundAuthConfig? auth = ();

// what formats does the user want results saved in?
public function main (string secret, // secret to send to the hub
string? username = (), // my username
string? password = (), // my password
boolean 'json = false, // do I want json?
boolean 'xml = false, // do I want xml?
string homeURL = "http://resultstest.ecdev.opensource.lk", // where do I subscribe at
Expand Down Expand Up @@ -71,21 +76,36 @@ public function main (string secret, // secret to send to the hub
// check whether this version is still supported
hr = check hc->get("/isactive/" + MY_VERSION);
if hr.statusCode != 200 {
io:println("*** This version of the subscriber is no longer supported!");
return;
return error("*** This version of the subscriber is no longer supported!");
sanjiva marked this conversation as resolved.
Show resolved Hide resolved
}

// start the listener
websub:Listener websubListener = new(subscriberPort);

if (username is string && password is string) {
auth:OutboundBasicAuthProvider outboundBasicAuthProvider = new ({
username: <@untainted> username,
password: <@untainted> password
});

http:BasicAuthHandler outboundBasicAuthHandler =
new (<auth:OutboundBasicAuthProvider> outboundBasicAuthProvider);
auth = {
authHandler: outboundBasicAuthHandler
};
}

// attach JSON subscriber
subscriberService = @websub:SubscriberServiceConfig {
path: JSON_PATH,
subscribeOnStartUp: true,
target: [hub, JSON_TOPIC],
leaseSeconds: TWO_DAYS_IN_SECONDS,
secret: subscriberSecret,
callback: subscriberPublicUrl.concat(JSON_PATH)
callback: subscriberPublicUrl.concat(JSON_PATH),
hubClientConfig: {
auth: auth
}
}
service {
resource function onNotification(websub:Notification notification) {
Expand Down