Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QTL] Implement LookupExtractorFactory of namespaced lookup #2926

Merged
merged 52 commits into from
May 24, 2016
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
14f4c88
support LookupReferencesManager registration of namespaced lookup and…
sirpkt Mar 24, 2016
7d75062
update KafkaExtractionNamespaceTest to reflect argument signature cha…
sirpkt Mar 31, 2016
e4ae726
Add more synchronization functionality to NamespaceLookupExtractorFac…
drcrallen Apr 7, 2016
a525a5f
Remove old way of using extraction namespaces
drcrallen Apr 7, 2016
153925a
Merge remote-tracking branch 'upstream/master' into qtl_namespace_lookup
sirpkt Apr 8, 2016
c5cc36a
Merge branch 'namespaceLookupMovetoLookups' of https://github.com/met…
sirpkt Apr 8, 2016
3b45ae2
Merge branch 'metamx-namespaceLookupMovetoLookups' into qtl_namespace…
sirpkt Apr 8, 2016
379af21
Merge remote-tracking branch 'upstream/master' into qtl_namespace_lookup
sirpkt Apr 27, 2016
5b35e42
resolve compile error by supporting LookupIntrospectHandler
sirpkt Apr 27, 2016
688e7c1
Merge remote-tracking branch 'druid/master' into mergeMasterLookups
drcrallen May 2, 2016
10fc4f8
Merge pull request #2 from metamx/mergeMasterLookups
sirpkt May 3, 2016
f2b6864
Merge remote-tracking branch 'upstream/master' into qtl_namespace_lookup
sirpkt May 3, 2016
00f42c1
Remove kafka lookups
drcrallen May 5, 2016
4e91b13
Remove unused stuff
drcrallen May 5, 2016
c23e06d
Fix start and stop behavior to be consistent with new javadocs
drcrallen May 5, 2016
1b3e6cc
Remove unused strings
drcrallen May 6, 2016
8a77bc7
Add timeout option
drcrallen May 6, 2016
d780468
Address comments on configurations and improve docs
drcrallen May 6, 2016
4216aa8
Add more options and update hash key and replaces
drcrallen May 6, 2016
fe44182
Move monitoring to the overriding classes
drcrallen May 6, 2016
4f89413
Add better start/stop logging
drcrallen May 6, 2016
99e8ac2
Remove old docs about namespace names
drcrallen May 6, 2016
4313a79
Fix bad comma
drcrallen May 6, 2016
a7b35ce
Add `@JsonIgnore` to lookup factory
drcrallen May 10, 2016
2f97f9d
Merge remote-tracking branch 'druid/master' into qtl_namespace_lookup…
drcrallen May 10, 2016
b0379b9
Address code review comments
drcrallen May 11, 2016
15dc879
Remove ExtractionNamespace from module json registration
drcrallen May 11, 2016
cda32b3
Merge remote-tracking branch 'druid/master' into qtl_namespace_lookup…
drcrallen May 11, 2016
ab2230c
Fix problems with naming and initialization. Add tests
drcrallen May 12, 2016
f33ed53
Optimize imports / reformat
drcrallen May 12, 2016
7d5f681
Fix future not being properly cancelled on failed initial scheduling
drcrallen May 13, 2016
e061eb6
Fix delete returns
drcrallen May 13, 2016
423e392
Add more docs about whole introspection
drcrallen May 16, 2016
25083a3
Add `/version` introspection point for lookups
drcrallen May 17, 2016
42bb4b2
Add more tests and address comments
drcrallen May 18, 2016
ef0fab2
Add StaticMap extraction namespace for testing. Also add a bunch of t…
drcrallen May 18, 2016
e772c5c
Move cache system property to `druid.lookup.namespace.cache.type`
drcrallen May 18, 2016
b2c7f96
Make VERSION lower case
drcrallen May 18, 2016
bcccf12
Change poll period to 0ms for StaticMap
drcrallen May 18, 2016
db45e44
Move cache key to bytebuffer
drcrallen May 19, 2016
552114a
Change hashCode and equals on static map extraction fn
drcrallen May 19, 2016
365d8f1
Add more comments on StaticMap
drcrallen May 19, 2016
df6dfc4
Address comments
drcrallen May 20, 2016
e430113
Make scheduleAndWait use a latch
drcrallen May 20, 2016
6283dc2
Sanity renames and fix imports
drcrallen May 20, 2016
c9db080
Remove extra info in docs
drcrallen May 20, 2016
fa1c0c1
Fix review comments
drcrallen May 20, 2016
38ca68e
Strengthen failure on start from warn to error
drcrallen May 20, 2016
6762c91
Address comments
drcrallen May 21, 2016
2330549
Merge remote-tracking branch 'druid/master' into qtl_namespace_lookup…
drcrallen May 23, 2016
15363e0
Rename namespace-lookup to lookups-cached-global
drcrallen May 23, 2016
9900d99
Fix injective mis-naming
drcrallen May 24, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extensions-core/namespace-lookup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,10 @@
<version>3.0.1</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this version into the parent pom?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to? there are other extensions who have extension-specific (aka nowhere else in druid) library versions in their pom. Is there a reason why this one needs to be in parent pom?

<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.query.extraction.namespace.ExtractionNamespace;
import io.druid.query.lookup.LookupExtractor;
import io.druid.query.lookup.LookupExtractorFactory;
import io.druid.query.lookup.LookupIntrospectHandler;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;

import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@JsonTypeName("namespace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need to call this something that reflects that is is a global cached lookup manager. How about CachedNamespace ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good will change

public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
{
private static final Logger LOG = new Logger(NamespaceLookupExtractorFactory.class);

private static long SCHEDULE_TIMEOUT = 60_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make this configurable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding


private final AtomicBoolean started = new AtomicBoolean(false);
private final ReadWriteLock startStopSync = new ReentrantReadWriteLock();
private final ExtractionNamespace extractionNamespace;
private final NamespaceExtractionCacheManager manager;
private final LookupIntrospectHandler lookupIntrospectHandler;

private final String extractorID;

@JsonCreator
public NamespaceLookupExtractorFactory(
@JsonProperty("extractionNamespace") ExtractionNamespace extractionNamespace,
@JacksonInject NamespaceExtractionCacheManager manager
)
{
this.extractionNamespace = Preconditions.checkNotNull(
extractionNamespace,
"extractionNamespace should be specified"
);
this.manager = manager;
this.extractorID = buildID();
this.lookupIntrospectHandler = new LookupIntrospectHandler() {
@GET
@Path("/keys")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Produces(MediaType.APPLICATION_JSON)
public Response getKeys()
{
return Response.ok(getLatest().keySet().toString()).build();
}

@GET
@Path("/values")
@Produces(MediaType.APPLICATION_JSON)
public Response getValues()
{
return Response.ok(getLatest().values().toString()).build();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getMap()
{
return Response.ok(getLatest()).build();
}

private Map<String, String> getLatest()
{
return ((MapLookupExtractor)get()).getMap();
}
};
}

@Override
public boolean start()
{
final Lock writeLock = startStopSync.writeLock();
writeLock.lock();
try {
if (!started.compareAndSet(false, true)) {
LOG.warn("Already started!");
return true;
}
if (!manager.scheduleAndWait(extractorID, extractionNamespace, SCHEDULE_TIMEOUT)) {
LOG.warn("Failed to schedule lookup [%s]", extractorID);
return false;
}
LOG.debug("NamespaceLookupExtractorFactory[%s] started", extractorID);
return true;
}
finally {
writeLock.unlock();
}
}

@Override
public boolean close()
{
final Lock writeLock = startStopSync.writeLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering isn't better to use synchronized ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not if possible. there's no reason to force the get() to be synchronized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

writeLock.lock();
try {
if (!started.compareAndSet(true, false)) {
LOG.warn("Not started!");
return true;
}
return manager.checkedDelete(extractorID);
}
finally {
writeLock.unlock();
}
}

@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
if (other != null && other instanceof NamespaceLookupExtractorFactory) {
NamespaceLookupExtractorFactory that = (NamespaceLookupExtractorFactory) other;
return !extractionNamespace.equals(that.extractionNamespace);
}
return true;
}

@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return lookupIntrospectHandler;
}

@JsonProperty
public ExtractionNamespace getExtractionNamespace()
{
return extractionNamespace;
}

private String buildID()
{
return UUID.randomUUID().toString();
}

// Grab the latest snapshot from the cache manager
@Override
public LookupExtractor get()
{
final Lock readLock = startStopSync.readLock();
readLock.lock();
try {
if (!started.get()) {
throw new ISE("Factory [%s] not started", extractorID);
}
String preVersion = null, postVersion = null;
Map<String, String> map = null;
// Make sure we absolutely know what version of map we grabbed (for caching purposes)
do {
preVersion = manager.getVersion(extractorID);
if (preVersion == null) {
throw new ISE("Namespace vanished for [%s]", extractorID);
}
map = manager.getCacheMap(extractorID);
postVersion = manager.getVersion(extractorID);
if (postVersion == null) {
// We lost some horrible race... make sure we clean up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the read/write lock meant to prevent this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, read/write lock is only for start/stop races. This is to try and help prevent a race in delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need such a sophisticated lock just for simple start stop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want get to be subject to a coarse synchronize so this was the next best thing.

manager.delete(extractorID);
throw new ISE("Lookup [%s] is deleting", extractorID);
}
} while (!preVersion.equals(postVersion));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am totally lost, why the version will change between two calls in the same function ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race condition during background update. If the updating task swaps the cache at a very inopportune time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but isn't when you get the read lock this can not happen ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read lock is only for the entire service shutting down, not for individual namespaces

final byte[] v = StringUtils.toUtf8(postVersion);
final byte[] id = StringUtils.toUtf8(extractorID);
return new MapLookupExtractor(map, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am assuming ppls would like to set the one to one var to true sometime. So it will be nice to add it to namesapce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen if added why this is false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because you're looking at an old diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
@Override
public byte[] getCacheKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be ok if this is kept in cache while the swap happen ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that is why the "version" is part of the key, so that the cache key is unique to this swap-in

{
return ByteBuffer
.allocate(id.length + 1 + v.length + 1)
.put(id)
.put((byte) 0xFF)
.put(v)
.put((byte) 0xFF)
.array();
}
};
}
finally {
readLock.unlock();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,5 @@
*/
public interface ExtractionNamespace
{
/**
* This is expected to return the namespace name. As an additional requirement, the implementation MUST supply a
* "namespace" field in the json representing the object which is equal to the return of this function
* @return The name of the namespace
*/
String getNamespace();
long getPollMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,8 @@
/**
*
*/
public interface ExtractionNamespaceFunctionFactory<T extends ExtractionNamespace>
public interface ExtractionNamespaceCacheFactory<T extends ExtractionNamespace>
{

/**
* Create a function for the given namespace which will do the manipulation requested in the extractionNamespace.
* A simple implementation would simply use the cache supplied by the `NamespaceExtractionCacheManager`.
* More advanced implementations may need more than just what can be cached by `NamespaceExtractionCacheManager`.
*
* @param extractionNamespace The ExtractionNamespace for which a manipulating function is needed.
*
* @return A function which will perform an extraction in accordance with the desires of the ExtractionNamespace
*/
Function<String, String> buildFn(T extractionNamespace, Map<String, String> cache);


/**
* @param extractionNamespace The ExtractionNamespace for which a manipulating reverse function is needed.
* @param cache view of the cache containing the function mapping.
*
* @return A function that will perform reverse lookup.
*/
Function<String, List<String>> buildReverseFn(T extractionNamespace, final Map<String, String> cache);

/**
* This function is called once if `ExtractionNamespace.getUpdateMs() == 0`, or every update if
* `ExtractionNamespace.getUpdateMs() > 0`
Expand All @@ -60,6 +39,7 @@ public interface ExtractionNamespaceFunctionFactory<T extends ExtractionNamespac
* initialize resources.
* If the result of the Callable is the same as what is passed in as lastVersion, then no swap takes place, and the swap is discarded.
*
* @param id The ID of ExtractionNamespace
* @param extractionNamespace The ExtractionNamespace for which to populate data.
* @param lastVersion The version which was last cached
* @param swap The temporary Map into which data may be placed and will be "swapped" with the proper
Expand All @@ -70,5 +50,5 @@ public interface ExtractionNamespaceFunctionFactory<T extends ExtractionNamespac
* @return A callable that will be used to refresh resources of the namespace and return the version string used in
* the populating
*/
Callable<String> getCachePopulator(T extractionNamespace, String lastVersion, Map<String, String> swap);
Callable<String> getCachePopulator(String id, T extractionNamespace, String lastVersion, Map<String, String> swap);
}
Loading