Skip to content

Commit

Permalink
Merge pull request #47 from stokito/tests_cleanup
Browse files Browse the repository at this point in the history
Tests cleanup and refactoring
  • Loading branch information
lvca committed Mar 28, 2015
2 parents 9b215d5 + b83262d commit 3ebba91
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.orientechnologies.orient.etl.transformer;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.index.OIndex;
import com.orientechnologies.orient.core.metadata.schema.OType;
Expand All @@ -39,9 +38,8 @@ public abstract class OAbstractLookupTransformer extends OAbstractTransformer {
protected Object joinValue;
protected String lookup;
protected ACTION unresolvedLinkAction = ACTION.NOTHING;
protected OSQLQuery<ODocument> sqlQuery;
protected OIndex<?> index;
protected ODatabaseDocumentTx db;
private OSQLQuery<ODocument> sqlQuery;
private OIndex<?> index;

protected enum ACTION {
NOTHING, WARNING, ERROR, HALT, SKIP, CREATE
Expand All @@ -63,15 +61,6 @@ public void configure(final OETLProcessor iProcessor, final ODocument iConfigura
unresolvedLinkAction = ACTION.valueOf(iConfiguration.field("unresolvedLinkAction").toString().toUpperCase());
}

@Override
public void begin() {
if (db == null) {
db = pipeline.getDocumentDatabase();
if (db == null)
throw new OTransformException("[" + getName() + " Transformer] database is not configured");
}
}

protected Object lookup(Object joinValue, final boolean iReturnRIDS) {
Object result = null;

Expand All @@ -81,7 +70,7 @@ protected Object lookup(Object joinValue, final boolean iReturnRIDS) {
if (lookup.toUpperCase().startsWith("SELECT"))
sqlQuery = new OSQLSynchQuery<ODocument>(lookup);
else {
index = db.getMetadata().getIndexManager().getIndex(lookup);
index = pipeline.getDocumentDatabase().getMetadata().getIndexManager().getIndex(lookup);
if (index == null) {
log(OETLProcessor.LOG_LEVELS.DEBUG, "WARNING: index %s not found. Lookups could be really slow", lookup);
final String[] parts = lookup.split("\\.");
Expand All @@ -95,7 +84,7 @@ protected Object lookup(Object joinValue, final boolean iReturnRIDS) {
joinValue = OType.convert(joinValue, idxFieldType.getDefaultJavaType());
result = index.get(joinValue);
} else
result = db.query(sqlQuery, joinValue);
result = pipeline.getDocumentDatabase().query(sqlQuery, joinValue);

if (result != null)
if (result instanceof Collection) {
Expand All @@ -108,7 +97,6 @@ protected Object lookup(Object joinValue, final boolean iReturnRIDS) {

if (iReturnRIDS && result instanceof ORecord)
result = ((ORecord) result).getIdentity();

}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ public Object transform(final Object input) {
log(OETLProcessor.LOG_LEVELS.DEBUG, "Transformer output: %s", result);
return result;
}

context.setVariable(output, result);
}

log(OETLProcessor.LOG_LEVELS.DEBUG, "Transformer output (same as input): %s", input);
return input;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Pass-through Transformer that execute a block.
*/
public class OBlockTransformer extends OAbstractTransformer {
protected OBlock block;
private OBlock block;

@Override
public ODocument getConfiguration() {
Expand Down Expand Up @@ -64,9 +64,7 @@ public void setPipeline(OETLPipeline iPipeline) {
@Override
protected Object executeTransform(final Object input) {
context.setVariable("input", input);

block.execute();

return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import java.util.List;

public class OCSVTransformer extends OAbstractTransformer {
protected char separator = ',';
protected boolean columnsOnFirstLine = true;
protected List<String> columnNames = null;
protected List<OType> columnTypes = null;
protected long skipFrom = -1;
protected long skipTo = -1;
protected long line = -1;
protected String nullValue;
protected char stringCharacter = '"';
private char separator = ',';
private boolean columnsOnFirstLine = true;
private List<String> columnNames = null;
private List<OType> columnTypes = null;
private long skipFrom = -1;
private long skipTo = -1;
private long line = -1;
private String nullValue;
private char stringCharacter = '"';

@Override
public ODocument getConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@
* Executes arbitrary code in any supported language by JVM.
*/
public class OCodeTransformer extends OAbstractTransformer {
protected String language = "javascript";
protected String code;
protected OCommandExecutorScript cmd;
protected Map<Object, Object> params = new HashMap<Object, Object>();
private String language = "javascript";
private OCommandExecutorScript cmd;
private final Map<Object, Object> params = new HashMap<Object, Object>();

@Override
public ODocument getConfiguration() {
Expand All @@ -50,6 +49,7 @@ public void configure(OETLProcessor iProcessor, final ODocument iConfiguration,
if (iConfiguration.containsField("language"))
language = iConfiguration.field("language");

String code;
if (iConfiguration.containsField("code"))
code = iConfiguration.field("code");
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* Executes a command.
*/
public class OCommandTransformer extends OAbstractTransformer {
protected String language = "sql";
protected String command;
private String language = "sql";
private String command;

@Override
public ODocument getConfiguration() {
Expand All @@ -56,25 +56,19 @@ public String getName() {

@Override
public Object executeTransform(final Object input) {

final OCommandRequest cmd;

String runtimeCommand = (String) resolve(command);

final OCommandRequest cmd;
if (language.equals("sql")) {
cmd = new OCommandSQL(runtimeCommand);
log(OETLProcessor.LOG_LEVELS.DEBUG, "executing command=%s...", runtimeCommand);
} else if (language.equals("gremlin")) {
cmd = new OCommandGremlin(runtimeCommand);
} else
} else {
cmd = new OCommandScript(language, runtimeCommand);

}
cmd.setContext(context);

Object result = pipeline.getDocumentDatabase().command(cmd).execute();

log(OETLProcessor.LOG_LEVELS.DEBUG, "executed command=%s, result=%s", cmd, result);

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.OETLProcessHaltedException;
import com.orientechnologies.orient.etl.OETLProcessor;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientEdge;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;

public class OEdgeTransformer extends OAbstractLookupTransformer {
protected OrientBaseGraph graph;
protected String edgeClass = "E";
protected boolean directionOut = true;
private String edgeClass = "E";
private boolean directionOut = true;

@Override
public ODocument getConfiguration() {
Expand All @@ -51,7 +49,6 @@ public ODocument getConfiguration() {
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, final OBasicCommandContext iContext) {
super.configure(iProcessor, iConfiguration, iContext);
edgeClass = iConfiguration.field("class");

if (iConfiguration.containsField("direction")) {
final String direction = iConfiguration.field("direction");
if ("out".equalsIgnoreCase(direction))
Expand All @@ -60,7 +57,6 @@ else if ("in".equalsIgnoreCase(direction))
directionOut = false;
else
throw new OConfigurationException("Direction can be 'in' or 'out', but found: " + direction);

}
}

Expand All @@ -71,31 +67,20 @@ public String getName() {

@Override
public void begin() {
if (graph == null) {
graph = pipeline.getGraphDatabase();

if (graph == null)
throw new OTransformException(getName() + ": graph instance not set");

final OClass cls = graph.getEdgeType(edgeClass);
if (cls == null)
graph.createEdgeType(edgeClass);
}

final OClass cls = pipeline.getGraphDatabase().getEdgeType(edgeClass);
if (cls == null)
pipeline.getGraphDatabase().createEdgeType(edgeClass);
super.begin();
}

@Override
public Object executeTransform(final Object input) {
if (graph == null)
throw new OETLProcessHaltedException("Graph instance not found. Assure you have configured it in the Loader");

// GET JOIN VALUE
final OrientVertex vertex;
if (input instanceof OIdentifiable)
vertex = graph.getVertex(input);
else if (input instanceof OrientVertex)
if (input instanceof OrientVertex)
vertex = (OrientVertex) input;
else if (input instanceof OIdentifiable)
vertex = pipeline.getGraphDatabase().getVertex(input);
else
throw new OTransformException(getName() + ": input type '" + input + "' is not supported");

Expand All @@ -116,7 +101,7 @@ else if (input instanceof OrientVertex)
return input;
}

protected OrientEdge createEdge(OrientVertex vertex, Object joinCurrentValue, Object result) {
private OrientEdge createEdge(final OrientVertex vertex, final Object joinCurrentValue, Object result) {
log(OETLProcessor.LOG_LEVELS.DEBUG, "joinCurrentValue=%s, lookupResult=%s", joinCurrentValue, result);

if (result == null) {
Expand All @@ -125,7 +110,7 @@ protected OrientEdge createEdge(OrientVertex vertex, Object joinCurrentValue, Ob
case CREATE:
if (lookup != null) {
final String[] lookupParts = lookup.split("\\.");
final OrientVertex linkedV = graph.addTemporaryVertex(lookupParts[0]);
final OrientVertex linkedV = pipeline.getGraphDatabase().addTemporaryVertex(lookupParts[0]);
linkedV.setProperty(lookupParts[1], joinCurrentValue);
linkedV.save();

Expand All @@ -151,7 +136,7 @@ protected OrientEdge createEdge(OrientVertex vertex, Object joinCurrentValue, Ob
}

if (result != null) {
final OrientVertex targetVertex = graph.getVertex(result);
final OrientVertex targetVertex = pipeline.getGraphDatabase().getVertex(result);

// CREATE THE EDGE
final OrientEdge edge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import com.orientechnologies.orient.etl.OETLProcessor;

public class OFieldTransformer extends OAbstractTransformer {
protected String fieldName;
protected String expression;
protected Object value;
protected boolean setOperation = true;
protected OSQLFilter sqlFilter;
protected boolean save = false;
private String fieldName;
private String expression;
private Object value;
private boolean setOperation = true;
private OSQLFilter sqlFilter;
private boolean save = false;

@Override
public ODocument getConfiguration() {
Expand Down Expand Up @@ -97,11 +97,8 @@ public Object executeTransform(final Object input) {
}

if (save) {
final ODatabaseDocumentTx db = super.pipeline.getDocumentDatabase();
if (db == null)
throw new OTransformException("Database instance not found in pipeline");

log(OETLProcessor.LOG_LEVELS.DEBUG, "saving record %s", doc);
final ODatabaseDocumentTx db = super.pipeline.getDocumentDatabase();
db.save(doc);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.orientechnologies.orient.etl.OETLProcessor;

public class OFlowTransformer extends OAbstractTransformer {
protected String operation;
private String operation;

@Override
public ODocument getConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
* Converts a JOIN in LINK
*/
public class OLinkTransformer extends OAbstractLookupTransformer {
protected String joinValue;
protected String linkFieldName;
protected OType linkFieldType;
private String joinValue;
private String linkFieldName;
private OType linkFieldType;

@Override
public ODocument getConfiguration() {
Expand Down Expand Up @@ -73,29 +73,28 @@ public String getName() {

@Override
public Object executeTransform(final Object input) {
ODocument doc;

if (!(input instanceof OIdentifiable)) {
log(OETLProcessor.LOG_LEVELS.DEBUG, "skip because input value is not a record, but rather an instance of class: %s", input.getClass());
return null;
}

doc = ((OIdentifiable) input).getRecord();

Object joinRuntimeValue = null;
final ODocument doc = ((OIdentifiable) input).getRecord();
final Object joinRuntimeValue;
if (joinFieldName != null)
joinRuntimeValue = doc.field(joinFieldName);
else if (joinValue != null)
joinRuntimeValue = resolve(joinValue);
else
joinRuntimeValue = null;

Object result;
if (OMultiValue.isMultiValue(joinRuntimeValue)) {
// RESOLVE SINGLE JOINS
result = new ArrayList<Object>();
final Collection<Object> singleJoinsResult = new ArrayList<Object>();
for (Object o : OMultiValue.getMultiValueIterable(joinRuntimeValue)) {
final Object r = lookup(o, true);
((List) result).add(r);
singleJoinsResult.add(lookup(o, true));
}
result = singleJoinsResult;
} else
result = lookup(joinRuntimeValue, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* ETL Transformer that logs the input.
*/
public class OLogTransformer extends OAbstractTransformer {
final PrintStream out = System.out;
protected String prefix = "";
protected String postfix = "";
private final PrintStream out = System.out;
private String prefix = "";
private String postfix = "";

@Override
public ODocument getConfiguration() {
Expand Down Expand Up @@ -66,7 +66,7 @@ public Object executeTransform(final Object input) {
if (postfix != null && !postfix.isEmpty())
buffer.append(resolve(postfix));

out.println(buffer.toString());
out.println(buffer.toString()); //TODO log(OETLProcessor.LOG_LEVELS.INFO, buffer.toString());

return input;
}
Expand Down
Loading

0 comments on commit 3ebba91

Please sign in to comment.