-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QTL] Implement LookupExtractorFactory of namespaced lookup #2926
Changes from 15 commits
14f4c88
7d75062
e4ae726
a525a5f
153925a
c5cc36a
3b45ae2
379af21
5b35e42
688e7c1
10fc4f8
f2b6864
00f42c1
4e91b13
c23e06d
1b3e6cc
8a77bc7
d780468
4216aa8
fe44182
4f89413
99e8ac2
4313a79
a7b35ce
2f97f9d
b0379b9
15dc879
cda32b3
ab2230c
f33ed53
7d5f681
e061eb6
423e392
25083a3
42bb4b2
ef0fab2
e772c5c
b2c7f96
bcccf12
db45e44
552114a
365d8f1
df6dfc4
e430113
6283dc2
c9db080
fa1c0c1
38ca68e
6762c91
2330549
15363e0
9900d99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package io.druid.query.extraction; | ||
|
||
import com.fasterxml.jackson.annotation.JacksonInject; | ||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.annotation.JsonTypeName; | ||
import com.google.common.base.Preconditions; | ||
import com.metamx.common.ISE; | ||
import com.metamx.common.StringUtils; | ||
import com.metamx.common.logger.Logger; | ||
import io.druid.query.extraction.namespace.ExtractionNamespace; | ||
import io.druid.query.lookup.LookupExtractor; | ||
import io.druid.query.lookup.LookupExtractorFactory; | ||
import io.druid.query.lookup.LookupIntrospectHandler; | ||
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager; | ||
|
||
import javax.annotation.Nullable; | ||
import javax.ws.rs.GET; | ||
import javax.ws.rs.Path; | ||
import javax.ws.rs.Produces; | ||
import javax.ws.rs.core.MediaType; | ||
import javax.ws.rs.core.Response; | ||
import java.nio.ByteBuffer; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
@JsonTypeName("namespace") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we need to call this something that reflects that is is a global cached lookup manager. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good will change |
||
public class NamespaceLookupExtractorFactory implements LookupExtractorFactory | ||
{ | ||
private static final Logger LOG = new Logger(NamespaceLookupExtractorFactory.class); | ||
|
||
private static long SCHEDULE_TIMEOUT = 60_000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe make this configurable ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding |
||
|
||
private final AtomicBoolean started = new AtomicBoolean(false); | ||
private final ReadWriteLock startStopSync = new ReentrantReadWriteLock(); | ||
private final ExtractionNamespace extractionNamespace; | ||
private final NamespaceExtractionCacheManager manager; | ||
private final LookupIntrospectHandler lookupIntrospectHandler; | ||
|
||
private final String extractorID; | ||
|
||
@JsonCreator | ||
public NamespaceLookupExtractorFactory( | ||
@JsonProperty("extractionNamespace") ExtractionNamespace extractionNamespace, | ||
@JacksonInject NamespaceExtractionCacheManager manager | ||
) | ||
{ | ||
this.extractionNamespace = Preconditions.checkNotNull( | ||
extractionNamespace, | ||
"extractionNamespace should be specified" | ||
); | ||
this.manager = manager; | ||
this.extractorID = buildID(); | ||
this.lookupIntrospectHandler = new LookupIntrospectHandler() { | ||
@GET | ||
@Path("/keys") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we document this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getKeys() | ||
{ | ||
return Response.ok(getLatest().keySet().toString()).build(); | ||
} | ||
|
||
@GET | ||
@Path("/values") | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getValues() | ||
{ | ||
return Response.ok(getLatest().values().toString()).build(); | ||
} | ||
|
||
@GET | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getMap() | ||
{ | ||
return Response.ok(getLatest()).build(); | ||
} | ||
|
||
private Map<String, String> getLatest() | ||
{ | ||
return ((MapLookupExtractor)get()).getMap(); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public boolean start() | ||
{ | ||
final Lock writeLock = startStopSync.writeLock(); | ||
writeLock.lock(); | ||
try { | ||
if (!started.compareAndSet(false, true)) { | ||
LOG.warn("Already started!"); | ||
return true; | ||
} | ||
if (!manager.scheduleAndWait(extractorID, extractionNamespace, SCHEDULE_TIMEOUT)) { | ||
LOG.warn("Failed to schedule lookup [%s]", extractorID); | ||
return false; | ||
} | ||
LOG.debug("NamespaceLookupExtractorFactory[%s] started", extractorID); | ||
return true; | ||
} | ||
finally { | ||
writeLock.unlock(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean close() | ||
{ | ||
final Lock writeLock = startStopSync.writeLock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering isn't better to use synchronized ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather not if possible. there's no reason to force the get() to be synchronized. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
writeLock.lock(); | ||
try { | ||
if (!started.compareAndSet(true, false)) { | ||
LOG.warn("Not started!"); | ||
return true; | ||
} | ||
return manager.checkedDelete(extractorID); | ||
} | ||
finally { | ||
writeLock.unlock(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean replaces(@Nullable LookupExtractorFactory other) | ||
{ | ||
if (other != null && other instanceof NamespaceLookupExtractorFactory) { | ||
NamespaceLookupExtractorFactory that = (NamespaceLookupExtractorFactory) other; | ||
return !extractionNamespace.equals(that.extractionNamespace); | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public LookupIntrospectHandler getIntrospectHandler() | ||
{ | ||
return lookupIntrospectHandler; | ||
} | ||
|
||
@JsonProperty | ||
public ExtractionNamespace getExtractionNamespace() | ||
{ | ||
return extractionNamespace; | ||
} | ||
|
||
private String buildID() | ||
{ | ||
return UUID.randomUUID().toString(); | ||
} | ||
|
||
// Grab the latest snapshot from the cache manager | ||
@Override | ||
public LookupExtractor get() | ||
{ | ||
final Lock readLock = startStopSync.readLock(); | ||
readLock.lock(); | ||
try { | ||
if (!started.get()) { | ||
throw new ISE("Factory [%s] not started", extractorID); | ||
} | ||
String preVersion = null, postVersion = null; | ||
Map<String, String> map = null; | ||
// Make sure we absolutely know what version of map we grabbed (for caching purposes) | ||
do { | ||
preVersion = manager.getVersion(extractorID); | ||
if (preVersion == null) { | ||
throw new ISE("Namespace vanished for [%s]", extractorID); | ||
} | ||
map = manager.getCacheMap(extractorID); | ||
postVersion = manager.getVersion(extractorID); | ||
if (postVersion == null) { | ||
// We lost some horrible race... make sure we clean up | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the read/write lock meant to prevent this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, read/write lock is only for start/stop races. This is to try and help prevent a race in delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need such a sophisticated lock just for simple start stop ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't want get to be subject to a coarse |
||
manager.delete(extractorID); | ||
throw new ISE("Lookup [%s] is deleting", extractorID); | ||
} | ||
} while (!preVersion.equals(postVersion)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i am totally lost, why the version will change between two calls in the same function ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. race condition during background update. If the updating task swaps the cache at a very inopportune time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but isn't when you get the read lock this can not happen ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The read lock is only for the entire service shutting down, not for individual namespaces |
||
final byte[] v = StringUtils.toUtf8(postVersion); | ||
final byte[] id = StringUtils.toUtf8(extractorID); | ||
return new MapLookupExtractor(map, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i am assuming ppls would like to set the one to one var to true sometime. So it will be nice to add it to namesapce. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @drcrallen if added why this is false ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because you're looking at an old diff? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
{ | ||
@Override | ||
public byte[] getCacheKey() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be ok if this is kept in cache while the swap happen ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, that is why the "version" is part of the key, so that the cache key is unique to this swap-in |
||
{ | ||
return ByteBuffer | ||
.allocate(id.length + 1 + v.length + 1) | ||
.put(id) | ||
.put((byte) 0xFF) | ||
.put(v) | ||
.put((byte) 0xFF) | ||
.array(); | ||
} | ||
}; | ||
} | ||
finally { | ||
readLock.unlock(); | ||
} | ||
} | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this version into the parent pom?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to? there are other extensions who have extension-specific (aka nowhere else in druid) library versions in their pom. Is there a reason why this one needs to be in parent pom?