Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Appenderator stuff #1907

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ Void insertOrUpdate(
final byte[] value
) throws Exception;


byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
);

void createDataSourceTable();

void createPendingSegmentsTable();

void createSegmentTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MetadataStorageTablesConfig
{
public static MetadataStorageTablesConfig fromBase(String base)
{
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null);
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null);
}

public static final String TASK_ENTRY_TYPE = "task";
Expand All @@ -45,6 +45,9 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonProperty("base")
private final String base;

@JsonProperty("dataSource")
private final String dataSourceTable;

@JsonProperty("pendingSegments")
private final String pendingSegmentsTable;

Expand Down Expand Up @@ -72,6 +75,7 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonCreator
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("dataSource") String dataSourceTable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is getting kind of silly... we may want to consider a better way of handling this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

((not intended to be a blocker here))

@JsonProperty("pendingSegments") String pendingSegmentsTable,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
Expand All @@ -83,6 +87,7 @@ public MetadataStorageTablesConfig(
)
{
this.base = (base == null) ? DEFAULT_BASE : base;
this.dataSourceTable = makeTableName(dataSourceTable, "dataSource");
this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
Expand Down Expand Up @@ -115,6 +120,11 @@ public String getBase()
return base;
}

public String getDataSourceTable()
{
return dataSourceTable;
}

public String getPendingSegmentsTable()
{
return pendingSegmentsTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,20 @@ public void testIsTransientException() throws Exception
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null))
Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
);

Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public String getPassword()
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{
return new MetadataStorageTablesConfig(
null,
null,
segmentTable,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public InputStream openStream() throws IOException
);
metadataStorageTablesConfigSupplier = derbyConnectorRule.metadataTablesConfigSupplier();
connector = derbyConnectorRule.getConnector();

try {
connector.getDBI().withHandle(
new HandleCallback<Void>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
Expand All @@ -42,15 +41,27 @@
*/
public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
@JsonIgnore
private final Set<DataSegment> segments;
private final Object oldCommitMetadata;
private final Object newCommitMetadata;

public SegmentInsertAction(
Set<DataSegment> segments
)
{
this(segments, null, null);
}

@JsonCreator
public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("oldCommitMetadata") Object oldCommitMetadata,
@JsonProperty("newCommitMetadata") Object newCommitMetadata
)
{
this.segments = ImmutableSet.copyOf(segments);
this.oldCommitMetadata = oldCommitMetadata;
this.newCommitMetadata = newCommitMetadata;
}

@JsonProperty
Expand All @@ -59,6 +70,18 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
public Object getOldCommitMetadata()
{
return oldCommitMetadata;
}

@JsonProperty
public Object getNewCommitMetadata()
{
return newCommitMetadata;
}

public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Set<DataSegment>>()
Expand All @@ -69,9 +92,18 @@ public TypeReference<Set<DataSegment>> getReturnTypeReference()
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
// TODO: It's possible that we lose our locks after calling this. This should be OK if we're using commitMetadata.
// TODO: Although, of course, that's not always used...
toolbox.verifyTaskLocks(task, segments);

final Set<DataSegment> retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(segments);
// TODO: I'm pretty sure the attempt at transactionality is foiled by:
// TODO: - a zombie task can clobber a good segment on deep storage
// TODO: - announceHistoricalSegments will silently do nothing if one already exists with the same id
final Set<DataSegment> retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
oldCommitMetadata,
newCommitMetadata
);

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
Expand Down
Loading