Skip to content

Commit

Permalink
Fix safeLock not run when interrupt & Fix partial notification being …
Browse files Browse the repository at this point in the history
…cancelled in ServiceInstancesChangedListener (#14730)

* Fix safeLock not run when interrupt & Clear interrupted status for MetadataUtils

* Fix safeLock not run when interrupt & Clear interrupted status for MetadataUtils

* Fix safeLock not run when interrupt & Clear interrupted status for MetadataUtils

* Fix notify

* Fix style

* Fix ut
  • Loading branch information
AlbumenJ authored Sep 30, 2024
1 parent b2eba82 commit 86c1a9d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@ public class LockUtils {

public static void safeLock(Lock lock, int timeout, Runnable runnable) {
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.error(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Try to lock failed, timeout: " + timeout,
new TimeoutException());
boolean interrupted = false;
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.error(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Try to lock failed, timeout: " + timeout,
new TimeoutException());
}
} catch (InterruptedException e) {
logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e);
interrupted = true;
}
runnable.run();
} catch (InterruptedException e) {
logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e);
Thread.currentThread().interrupt();
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
try {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void testInterrupt() {
thread.interrupt();
await().until(() -> thread.getState() == State.TERMINATED);

Assertions.assertFalse(locked.get());
Assertions.assertTrue(locked.get());

reentrantLock.unlock();
}
Expand All @@ -141,4 +141,24 @@ void testHoldLock() throws InterruptedException {
Assertions.assertTrue(lockTime.get() - startTime > 1000);
Assertions.assertTrue(lockTime.get() - startTime < 10000);
}

@RepeatedTest(5)
void testInterrupted() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();

AtomicLong lockTime = new AtomicLong(0);
long startTime = System.currentTimeMillis();
Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
LockUtils.safeLock(reentrantLock, 10000, () -> {
lockTime.set(System.currentTimeMillis());
});
});
thread.start();

await().until(() -> thread.getState() == State.TERMINATED);
Assertions.assertTrue(lockTime.get() >= startTime);
Assertions.assertTrue(lockTime.get() - startTime < 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,28 +182,8 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
}

int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) { // retry every 10 seconds
if (emptyNum != 0) {
hasEmptyMetadata = true;
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(
new AddressRefreshRetryTask(retryPermission, event.getServiceName()),
10_000L,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(
INTERNAL_ERROR,
"unknown error in registry module",
"",
"Error submitting async retry task.");
}
logger.warn(
INTERNAL_ERROR, "unknown error in registry module", "", "Address refresh try task submitted");
}

// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
Expand All @@ -214,10 +194,12 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
"",
"Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");

submitRetryTask(event);
return;
}
} else {
hasEmptyMetadata = false;
}
hasEmptyMetadata = false;

Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
Expand All @@ -241,6 +223,30 @@ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {

this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();

if (hasEmptyMetadata) {
submitRetryTask(event);
}
}

private void submitRetryTask(ServiceInstancesChangedEvent event) {
// retry every 10 seconds
if (retryPermission.tryAcquire()) {
if (retryFuture != null && !retryFuture.isDone()) {
// cancel last retryFuture because only one retryFuture will be canceled at destroy().
retryFuture.cancel(true);
}
try {
retryFuture = scheduler.schedule(
new AddressRefreshRetryTask(retryPermission, event.getServiceName()),
10_000L,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(
INTERNAL_ERROR, "unknown error in registry module", "", "Error submitting async retry task.");
}
logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Address refresh try task submitted");
}
}

public synchronized void addListenerAndNotify(URL url, NotifyListener listener) {
Expand Down

0 comments on commit 86c1a9d

Please sign in to comment.