Skip to content

Commit

Permalink
Clustering: fixed network management
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jun 14, 2013
1 parent 8b42c1b commit ccd9b3b
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,9 +671,9 @@ protected ODocument sendRequest(final byte iRequest, final ODocument iPayLoad, f
storage.endRequest(network);
}

storage.beginResponse(network);
retry = false;
try {
storage.beginResponse(network);
return new ODocument(network.readBytes());
} finally {
storage.endResponse(network);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public OIndexRebuildOutputListener(final OIndex<?> idx) {
public void onBegin(final Object iTask, final long iTotal) {
startTime = System.currentTimeMillis();
lastDump = startTime;
OLogManager.instance().debug(idx, "- Building index %s...", idx.getName());
OLogManager.instance().debug(this, "- Building index %s...", idx.getName());
}

@Override
public boolean onProgress(final Object iTask, final long iCounter, final float iPercent) {
final long now = System.currentTimeMillis();
if (now - lastDump > 10000) {
// DUMP EVERY 5 SECONDS FOR LARGE INDEXES
OLogManager.instance().debug(idx, "--> %3.2f%% progress, %,d indexed so far (%,d items/sec)", iPercent, iCounter,
OLogManager.instance().debug(this, "--> %3.2f%% progress, %,d indexed so far (%,d items/sec)", iPercent, iCounter,
((iCounter - lastCounter) / 10));
lastDump = now;
lastCounter = iCounter;
Expand All @@ -56,7 +56,7 @@ public boolean onProgress(final Object iTask, final long iCounter, final float i

@Override
public void onCompletition(final Object iTask, final boolean iSucceed) {
OLogManager.instance().debug(idx, "--> OK, indexed %,d items in %,d ms", idx.getSize(),
OLogManager.instance().debug(this, "--> OK, indexed %,d items in %,d ms", idx.getSize(),
(System.currentTimeMillis() - startTime));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ else if (!lockRead.tryLock(iTimeout, TimeUnit.MILLISECONDS))
iRequesterId, currentSessionId);

if (iTimeout > 0 && (System.currentTimeMillis() - startClock) > iTimeout) {
// CLOSE THE SOCKET TO CHANNEL TO AVOID FURTHER DIRTY DATA
// CLOSE THE SOCKET TO CHANNEL TO AVOID FURTHER DIRTY DATA
close();
throw new OTimeoutException("Timeout on reading response from the server for the request " + iRequesterId);
}
Expand Down Expand Up @@ -159,9 +159,16 @@ public void endResponse() {

try {
readCondition.signalAll();
} catch (IllegalMonitorStateException e) {
// IGNORE IT
OLogManager.instance().debug(this, "Error on signaling waiting clients after reading response");
}

try {
lockRead.unlock();
} catch (IllegalMonitorStateException e) {
// IGNORE IT
OLogManager.instance().debug(this, "Error on unlocking network channel after reading response");
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/config/orientdb-server-log.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ handlers = java.util.logging.ConsoleHandler, java.util.logging.FileHandler
# Set the default logging level for the root logger
.level = ALL
com.orientechnologies.orient.server.distributed.level = FINE
com.orientechnologies.orient.core.index.OIndexRebuildOutputListener = FINE
com.orientechnologies.orient.core.index.OIndexRebuildOutputListener.level = FINE

# Set the default logging level for new ConsoleHandler instances
java.util.logging.ConsoleHandler.level = INFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,23 @@ public void pushDistribCfg2Clients(final ODocument iConfig) {
final ONetworkProtocolBinary p = (ONetworkProtocolBinary) c.protocol;
final OChannelBinary channel = (OChannelBinary) p.getChannel();

channel.acquireExclusiveLock();
try {
channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
channel.writeInt(Integer.MIN_VALUE);
channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_DISTRIB_CONFIG);
channel.writeBytes(content);
channel.flush();

pushed.add(c.getRemoteAddress());
OLogManager.instance().info(this, "Sent updated cluster configuration to the remote client %s", c.getRemoteAddress());

channel.acquireExclusiveLock();
try {
channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
channel.writeInt(Integer.MIN_VALUE);
channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_DISTRIB_CONFIG);
channel.writeBytes(content);
channel.flush();

pushed.add(c.getRemoteAddress());
OLogManager.instance().info(this, "Sent updated cluster configuration to the remote client %s", c.getRemoteAddress());

} finally {
channel.releaseExclusiveLock();
}
} catch (IOException e) {
OLogManager.instance().warn(this, "Cannot push cluster configuration to client %s", c.getRemoteAddress());
} finally {
channel.releaseExclusiveLock();
OLogManager.instance().warn(this, "Cannot push cluster configuration to the client %s", c.getRemoteAddress());
}
}

Expand Down Expand Up @@ -306,15 +308,23 @@ public void pushRecord2Clients(final ORecordInternal<?> iRecord, final OClientCo

if (c.database != null && c.database.getName().equals(dbName))
synchronized (c) {
channel.acquireExclusiveLock();
try {
channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
channel.writeInt(Integer.MIN_VALUE);
channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_RECORD);
p.writeIdentifiable(iRecord);
} finally {
channel.releaseExclusiveLock();

channel.acquireExclusiveLock();
try {

channel.writeByte(OChannelBinaryProtocol.PUSH_DATA);
channel.writeInt(Integer.MIN_VALUE);
channel.writeByte(OChannelBinaryProtocol.REQUEST_PUSH_RECORD);
p.writeIdentifiable(iRecord);

} finally {
channel.releaseExclusiveLock();
}
} catch (IOException e) {
OLogManager.instance().warn(this, "Cannot push record to the client %s", c.getRemoteAddress());
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,20 @@ public ODistributedStorage(final ODistributedServerManager iCluster, final OStor
}

public Object command(final OCommandRequestText iCommand) {

final OCommandExecutor executor = OCommandManager.instance().getExecutor(iCommand);

executor.setProgressListener(iCommand.getProgressListener());
executor.parse(iCommand);

final boolean distribute;
boolean distribute = false;
final OCommandExecutor exec = executor instanceof OCommandExecutorSQLDelegate ? ((OCommandExecutorSQLDelegate) executor)
.getDelegate() : executor;
if (exec instanceof OCommandDistributedConditionalReplicateRequest)
distribute = ((OCommandDistributedConditionalReplicateRequest) exec).isReplicated();
else if (exec instanceof OCommandDistributedReplicateRequest)
distribute = true;
else
distribute = false;

if (!ODistributedThreadLocal.INSTANCE.distributedExecution)
if (exec instanceof OCommandDistributedConditionalReplicateRequest)
distribute = ((OCommandDistributedConditionalReplicateRequest) exec).isReplicated();
else if (exec instanceof OCommandDistributedReplicateRequest)
distribute = true;

if (distribute)
ODistributedThreadLocal.INSTANCE.distributedExecution = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public OAlignRequestDistributedTask(final String nodeSource, final String iDbNam

@Override
public Integer call() throws Exception {
OLogManager.instance().warn(this, "DISTRIBUTED <-[%s/%s] align request starting from %d.%d", nodeSource, databaseName,
OLogManager.instance().info(this, "DISTRIBUTED <-[%s/%s] align request starting from %d.%d", nodeSource, databaseName,
lastRunId, lastOperationId);

int aligned;
Expand All @@ -78,18 +78,18 @@ public Integer call() throws Exception {
final List<Long> positions = new ArrayList<Long>();

Iterator<Long> it = log.browse(new long[] { lastRunId, lastOperationId });
//for (Iterator<Long> it = log.browse(new long[] { lastRunId, lastOperationId }); it.hasNext();) {
while(it.hasNext()) {
// for (Iterator<Long> it = log.browse(new long[] { lastRunId, lastOperationId }); it.hasNext();) {
while (it.hasNext()) {
final long pos = it.next();

final OAbstractDistributedTask<?> operation = log.getOperation(pos);
if (operation == null) {
OLogManager.instance().warn(this, "DISTRIBUTED ->[%s/%s] skipped operation #%d.%d", nodeSource, databaseName,
OLogManager.instance().info(this, "DISTRIBUTED ->[%s/%s] skipped operation #%d.%d", nodeSource, databaseName,
lastRunId, lastOperationId);
continue;
}

OLogManager.instance().warn(this, "DISTRIBUTED ->[%s/%s] operation %s", nodeSource, databaseName, operation);
OLogManager.instance().info(this, "DISTRIBUTED ->[%s/%s] operation %s", nodeSource, databaseName, operation);

operation.setNodeSource(localNode);
operation.setDatabaseName(databaseName);
Expand All @@ -105,7 +105,7 @@ public Integer call() throws Exception {
if (tasks.getTasks() > 0)
aligned += flushBufferedTasks(dManager, synchronizer, tasks, positions);

OLogManager.instance().warn(this, "DISTRIBUTED ->[%s/%s] aligned %d operations", nodeSource, databaseName, aligned);
OLogManager.instance().info(this, "DISTRIBUTED ->[%s/%s] aligned %d operations", nodeSource, databaseName, aligned);
} finally {
alignmentLock.unlock();
}
Expand Down

0 comments on commit ccd9b3b

Please sign in to comment.