Skip to content

Commit

Permalink
Refactoring of the ERS3UploadQueue and ERCloudFilesUploadQueue to sha…
Browse files Browse the repository at this point in the history
…re the same implementation and avoid accidental death of the queue thread

The ERS3UploadQueue and ERCloudFilesUploadQueue are clearly duplicated. This commit introduces an abstract queue that handles all the logic to upload files asynchronously to a remote server. The upload process is unchanged. The queue logic was improved to perform all the upload logic (preparation + execution) inside of a try-catch block. This small change makes the queue much more robust by avoiding accidental death of the queue thread when an exception is thrown during the preparation phase.
  • Loading branch information
hprange committed Feb 14, 2014
1 parent f0bafe5 commit 63ae538
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.rackspacecloud.client.cloudfiles.FilesClient;
import com.webobjects.eocontrol.EOEditingContext;

import er.attachment.upload.ERRemoteAttachment;
import er.extensions.eof.ERXGenericRecord;
import er.extensions.foundation.ERXProperties;

Expand All @@ -17,7 +18,7 @@
*
* @author probert
*/
public class ERCloudFilesAttachment extends _ERCloudFilesAttachment {
public class ERCloudFilesAttachment extends _ERCloudFilesAttachment implements ERRemoteAttachment {
@SuppressWarnings("unused")
private static Logger log = Logger.getLogger(ERCloudFilesAttachment.class);
public static final String STORAGE_TYPE = "cf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.amazon.s3.QueryStringAuthGenerator;
import com.webobjects.eocontrol.EOEditingContext;

import er.attachment.upload.ERRemoteAttachment;
import er.extensions.eof.ERXGenericRecord;
import er.extensions.foundation.ERXProperties;

Expand All @@ -24,7 +25,7 @@
*
* @author mschrag
*/
public class ERS3Attachment extends _ERS3Attachment {
public class ERS3Attachment extends _ERS3Attachment implements ERRemoteAttachment {
/**
* Do I need to update serialVersionUID?
* See section 5.6 <cite>Type Changes Affecting Serialization</cite> on page 51 of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@

import er.attachment.ERAttachmentRequestHandler;
import er.attachment.model.ERCloudFilesAttachment;
import er.extensions.concurrency.ERXAsyncQueue;
import er.extensions.eof.ERXEC;
import er.extensions.foundation.ERXExceptionUtilities;
import er.attachment.upload.ERAttachmentUploadQueue;
import er.extensions.foundation.ERXProperties;

/**
Expand All @@ -49,7 +47,7 @@ public class ERCloudFilesAttachmentProcessor extends
private ERCloudFilesUploadQueue _queue;

public ERCloudFilesAttachmentProcessor() {
_queue = new ERCloudFilesUploadQueue();
_queue = new ERCloudFilesUploadQueue("ERCloudFilesAsyncQueue", this);
_queue.start();
}

Expand Down Expand Up @@ -232,115 +230,33 @@ public void performUpload(File uploadedFile, String originalFileName,

}

public class ERCloudFilesQueueEntry {
private File _uploadedFile;
private ERCloudFilesAttachment _attachment;

public ERCloudFilesQueueEntry(File uploadedFile, ERCloudFilesAttachment attachment) {
_uploadedFile = uploadedFile;
_attachment = attachment;
}

public File uploadedFile() {
return _uploadedFile;
public class ERCloudFilesUploadQueue extends ERAttachmentUploadQueue<ERCloudFilesAttachment> {
public ERCloudFilesUploadQueue(String name, ERAttachmentProcessor<ERCloudFilesAttachment> processor) {
super(name, processor);
}

public ERCloudFilesAttachment attachment() {
return _attachment;
}
}

public class ERCloudFilesUploadQueue extends ERXAsyncQueue<ERCloudFilesQueueEntry> {
private EOEditingContext _editingContext;

public ERCloudFilesUploadQueue() {
super("ERS3AsyncQueue");
_editingContext = ERXEC.newEditingContext();
}
@Override
protected void performUpload(ERCloudFilesAttachment attachment, File uploadedFile) throws Exception {
String bucket;
String key;
String mimeType;
String originalFileName = null;

public void enqueue(ERCloudFilesAttachment attachment) {
_editingContext.lock();

try {
ERCloudFilesAttachment localAttachment = attachment
.localInstanceIn(_editingContext);
ERCloudFilesQueueEntry entry = new ERCloudFilesQueueEntry(
attachment._pendingUploadFile(), localAttachment);
enqueue(entry);
bucket = attachment.container();
key = attachment.key();
mimeType = attachment.mimeType();

if (proxyAsAttachment(attachment)) {
originalFileName = attachment.originalFileName();
}
} finally {
_editingContext.unlock();
}
}

@Override
public void process(ERCloudFilesQueueEntry object) {
ERCloudFilesAttachment attachment = object.attachment();

File uploadedFile = object.uploadedFile();
if (uploadedFile != null && uploadedFile.exists()) {
String bucket;
String key;
String mimeType;
String configurationName;
String originalFileName = null;

_editingContext.lock();
try {
bucket = attachment.container();
key = attachment.key();
mimeType = attachment.mimeType();
configurationName = attachment.configurationName();
if (proxyAsAttachment(attachment)) {
originalFileName = attachment.originalFileName();
}
} finally {
_editingContext.unlock();
}

try {
performUpload(uploadedFile, originalFileName,
bucket, key, mimeType, attachment);

_editingContext.lock();
try {
attachment.setAvailable(Boolean.TRUE);
_editingContext.saveChanges();
} finally {
_editingContext.unlock();
}
if (delegate() != null) {
delegate()
.attachmentAvailable(
ERCloudFilesAttachmentProcessor.this,
attachment);
}
} catch (Throwable t) {
if (delegate() != null) {
delegate()
.attachmentNotAvailable(
ERCloudFilesAttachmentProcessor.this,
attachment,
ERXExceptionUtilities.toParagraph(t));
}
ERAttachmentProcessor.log.error("Failed to upload '"
+ uploadedFile + "' to CloudFiles.", t);
} finally {
if (attachment._isPendingDelete()) {
uploadedFile.delete();
}
}
} else {
if (delegate() != null) {
delegate()
.attachmentNotAvailable(
ERCloudFilesAttachmentProcessor.this,
attachment,
"Missing attachment file '" + uploadedFile
+ "'.");
}
ERAttachmentProcessor.log.error("Missing attachment file '"
+ uploadedFile + "'.");
}
((ERCloudFilesAttachmentProcessor)_processor).performUpload(uploadedFile, originalFileName, bucket, key, mimeType, attachment);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@

import er.attachment.ERAttachmentRequestHandler;
import er.attachment.model.ERS3Attachment;
import er.extensions.concurrency.ERXAsyncQueue;
import er.extensions.eof.ERXEC;
import er.extensions.foundation.ERXExceptionUtilities;
import er.attachment.upload.ERAttachmentUploadQueue;
import er.extensions.foundation.ERXProperties;

/**
Expand All @@ -51,7 +49,7 @@ public class ERS3AttachmentProcessor extends
private ERS3UploadQueue _queue;

public ERS3AttachmentProcessor() {
_queue = new ERS3UploadQueue();
_queue = new ERS3UploadQueue("ERS3AsyncQueue", this);
_queue.start();
}

Expand Down Expand Up @@ -233,115 +231,31 @@ protected boolean failed(Response response) throws IOException {
return (responseCode < 200 || responseCode >= 300);
}

public class ERS3QueueEntry {
private File _uploadedFile;
private ERS3Attachment _attachment;

public ERS3QueueEntry(File uploadedFile, ERS3Attachment attachment) {
_uploadedFile = uploadedFile;
_attachment = attachment;
}

public File uploadedFile() {
return _uploadedFile;
public class ERS3UploadQueue extends ERAttachmentUploadQueue<ERS3Attachment> {
public ERS3UploadQueue(String name, ERAttachmentProcessor<ERS3Attachment> processor) {
super(name, processor);
}

public ERS3Attachment attachment() {
return _attachment;
}
}

public class ERS3UploadQueue extends ERXAsyncQueue<ERS3QueueEntry> {
private EOEditingContext _editingContext;

public ERS3UploadQueue() {
super("ERS3AsyncQueue");
_editingContext = ERXEC.newEditingContext();
}
@Override
protected void performUpload(ERS3Attachment attachment, File uploadedFile) throws Exception {
String bucket;
String key;
String mimeType;
String originalFileName = null;

public void enqueue(ERS3Attachment attachment) {
_editingContext.lock();
try {
ERS3Attachment localAttachment = attachment
.localInstanceIn(_editingContext);
ERS3QueueEntry entry = new ERS3QueueEntry(
attachment._pendingUploadFile(), localAttachment);
enqueue(entry);
bucket = attachment.bucket();
key = attachment.key();
mimeType = attachment.mimeType();
if (proxyAsAttachment(attachment)) {
originalFileName = attachment.originalFileName();
}
} finally {
_editingContext.unlock();
}
}

@Override
public void process(ERS3QueueEntry object) {
ERS3Attachment attachment = object.attachment();

File uploadedFile = object.uploadedFile();
if (uploadedFile != null && uploadedFile.exists()) {
String bucket;
String key;
String mimeType;
String configurationName;
String originalFileName = null;

_editingContext.lock();
try {
bucket = attachment.bucket();
key = attachment.key();
mimeType = attachment.mimeType();
configurationName = attachment.configurationName();
if (proxyAsAttachment(attachment)) {
originalFileName = attachment.originalFileName();
}
} finally {
_editingContext.unlock();
}

try {
performUpload(uploadedFile, originalFileName,
bucket, key, mimeType, attachment);

_editingContext.lock();
try {
attachment.setAvailable(Boolean.TRUE);
_editingContext.saveChanges();
} finally {
_editingContext.unlock();
}
if (delegate() != null) {
delegate()
.attachmentAvailable(
ERS3AttachmentProcessor.this,
attachment);
}
} catch (Throwable t) {
if (delegate() != null) {
delegate()
.attachmentNotAvailable(
ERS3AttachmentProcessor.this,
attachment,
ERXExceptionUtilities.toParagraph(t));
}
ERAttachmentProcessor.log.error("Failed to upload '"
+ uploadedFile + "' to S3.", t);
} finally {
if (attachment._isPendingDelete()) {
uploadedFile.delete();
}
}
} else {
if (delegate() != null) {
delegate()
.attachmentNotAvailable(
ERS3AttachmentProcessor.this,
attachment,
"Missing attachment file '" + uploadedFile
+ "'.");
}
ERAttachmentProcessor.log.error("Missing attachment file '"
+ uploadedFile + "'.");
}
((ERS3AttachmentProcessor) _processor).performUpload(uploadedFile, originalFileName, bucket, key, mimeType, attachment);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package er.attachment.upload;

import java.io.File;

import er.attachment.model.ERAttachment;

/**
* The <code>ERAttachmentQueueEntry</code> is a wrapper that keeps a reference
* to the <code>ERAttachment</code> and the file being enqueued for uploading.
*
* @author <a href="mailto:hprange@gmail.com.br">Henrique Prange</a>
*
* @param <T>
* the type of the attachment that can queued for uploading.
*
* @see ERRemoteAttachment
* @see ERAttachmentUploadQueue
*/
public class ERAttachmentQueueEntry<T extends ERAttachment & ERRemoteAttachment> {
private File _uploadedFile;
private T _attachment;

public ERAttachmentQueueEntry(File uploadedFile, T attachment) {
_uploadedFile = uploadedFile;
_attachment = attachment;
}

public File uploadedFile() {
return _uploadedFile;
}

public T attachment() {
return _attachment;
}
}
Loading

0 comments on commit 63ae538

Please sign in to comment.