Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1088 from zalando/delete_et_if_admin
Browse files Browse the repository at this point in the history
Allow admins to delete event types
  • Loading branch information
Lionel Montrieux committed Oct 1, 2019
2 parents fb39c77 + d202748 commit 41fd11a
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class KafkaRepositoryAT extends BaseAT {
private static final String DEFAULT_ADMIN_VALUE = "nakadi";
private static final String DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE = "";
private static final String DEFAULT_WARN_LOG_COMPACTION_MESSAGE = "";
private static final String DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_OWNING_APPLICATION = "nakadi_archiver";
private static final String DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_CONSUMER_GROUP = "nakadi_to_s3";

private NakadiSettings nakadiSettings;
private KafkaSettings kafkaSettings;
Expand All @@ -93,7 +95,9 @@ public void setup() {
DEFAULT_ADMIN_DATA_TYPE,
DEFAULT_ADMIN_VALUE,
DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE,
DEFAULT_WARN_LOG_COMPACTION_MESSAGE);
DEFAULT_WARN_LOG_COMPACTION_MESSAGE,
DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_OWNING_APPLICATION,
DEFAULT_EVENT_TYPE_DELETABLE_SUBSCRIPTION_CONSUMER_GROUP);

kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE,
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/org/zalando/nakadi/config/NakadiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class NakadiSettings {
private final AuthorizationAttribute defaultAdmin;
private final String warnAllDataAccessMessage;
private final String logCompactionWarnMessage;
private final String deletableSubscriptionOwningApplication;
private final String deletableSubscriptionConsumerGroup;

@Autowired
public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount,
Expand All @@ -39,7 +41,11 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.admin.default.dataType}") final String defaultAdminDataType,
@Value("${nakadi.admin.default.value}") final String defaultAdminValue,
@Value("${nakadi.authz.warnAllDataAccessMessage}") final String warnAllDataAccessMessage,
@Value("${nakadi.topic.compacted.warnMessage}") final String logCompactionWarnMessage) {
@Value("${nakadi.topic.compacted.warnMessage}") final String logCompactionWarnMessage,
@Value("${nakadi.eventType.deletableSubscription.owningApplication}")
final String deletableSubscriptionOwningApplication,
@Value("${nakadi.eventType.deletableSubscription.consumerGroup}")
final String deletableSubscriptionConsumerGroup) {
this.maxTopicPartitionCount = maxTopicPartitionCount;
this.defaultTopicPartitionCount = defaultTopicPartitionCount;
this.defaultTopicReplicaFactor = defaultTopicReplicaFactor;
Expand All @@ -54,6 +60,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultAdmin = new ResourceAuthorizationAttribute(defaultAdminDataType, defaultAdminValue);
this.warnAllDataAccessMessage = warnAllDataAccessMessage;
this.logCompactionWarnMessage = logCompactionWarnMessage;
this.deletableSubscriptionOwningApplication = deletableSubscriptionOwningApplication;
this.deletableSubscriptionConsumerGroup = deletableSubscriptionConsumerGroup;
}

public int getDefaultTopicPartitionCount() {
Expand Down Expand Up @@ -111,4 +119,12 @@ public String getWarnAllDataAccessMessage() {
public String getLogCompactionWarnMessage() {
return logCompactionWarnMessage;
}

public String getDeletableSubscriptionOwningApplication() {
return deletableSubscriptionOwningApplication;
}

public String getDeletableSubscriptionConsumerGroup() {
return deletableSubscriptionConsumerGroup;
}
}
29 changes: 28 additions & 1 deletion src/main/java/org/zalando/nakadi/service/EventTypeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.stream.Collectors;

import static org.zalando.nakadi.service.FeatureToggleService.Feature.DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.FORCE_EVENT_TYPE_AUTHZ;

@Component
Expand Down Expand Up @@ -249,7 +250,14 @@ public void delete(final String eventTypeName) throws EventTypeDeletionException
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeEventTypeAdmin(eventType);

if (featureToggleService.isFeatureEnabled(DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS)) {
if (featureToggleService.isFeatureEnabled(EVENT_TYPE_DELETION_ONLY_ADMINS)) {
if (eventType.getAuthorization() == null || hasNonDeletableSubscriptions(eventType.getName())) {
throw new AccessDeniedException(eventType.asResource());
}
}

if (featureToggleService.isFeatureEnabled(DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS)
|| featureToggleService.isFeatureEnabled(EVENT_TYPE_DELETION_ONLY_ADMINS)) {
topicsToDelete = deleteEventTypeWithSubscriptions(eventTypeName);
} else {
topicsToDelete = deleteEventTypeIfNoSubscriptions(eventTypeName);
Expand Down Expand Up @@ -333,6 +341,25 @@ private boolean hasSubscriptions(final String eventTypeName) {
return !subs.isEmpty();
}

private boolean hasNonDeletableSubscriptions(final String eventTypeName) {
int offset = 0;
List<Subscription> subs = subscriptionRepository.listSubscriptions(
ImmutableSet.of(eventTypeName), Optional.empty(), offset, 20);
while (!subs.isEmpty()) {
for (final Subscription sub : subs) {
if (!sub.getConsumerGroup().equals(nakadiSettings.getDeletableSubscriptionConsumerGroup())
|| !sub.getOwningApplication()
.equals(nakadiSettings.getDeletableSubscriptionOwningApplication())) {
return true;
}
}
offset += 20;
subs = subscriptionRepository.listSubscriptions(
ImmutableSet.of(eventTypeName), Optional.empty(), offset, 20);
}
return false;
}

public void update(final String eventTypeName,
final EventTypeBase eventTypeBase)
throws TopicConfigException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ enum Feature {
DISABLE_EVENT_TYPE_CREATION("disable_event_type_creation"),
DISABLE_EVENT_TYPE_DELETION("disable_event_type_deletion"),
DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS("delete_event_type_with_subscriptions"),
EVENT_TYPE_DELETION_ONLY_ADMINS("event_type_deletion_only_admins"),
DISABLE_SUBSCRIPTION_CREATION("disable_subscription_creation"),
REMOTE_TOKENINFO("remote_tokeninfo"),
KPI_COLLECTION("kpi_collection"),
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ nakadi:
dataType: service
value: stups_nakadi
authz.warnAllDataAccessMessage: "Data access warning"
eventType.deletableSubscription:
owningApplication: "nakadi_archiver"
consumerGroup: "nakadi_to_s3"
topic:
min:
retentionMs: 10800000 # 3 hours
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void init() throws Exception {
final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES,
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "I am warning you",
"I am warning you, even more");
"I am warning you, even more", "nakadi_archiver", "nakadi_to_s3");
final PartitionsCalculator partitionsCalculator = new KafkaConfig().createPartitionsCalculator(
"t2.large", TestUtils.OBJECT_MAPPER, nakadiSettings);
when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class EventPublisherTest {
private final AuthorizationValidator authzValidator = mock(AuthorizationValidator.class);
private final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT_MS, NAKADI_EVENT_MAX_BYTES,
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "", "");
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "nakadi", "", "", "nakadi_archiver", "nakadi_to_s3");
private final EventPublisher publisher;

public EventPublisherTest() {
Expand Down
80 changes: 80 additions & 0 deletions src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.assertj.core.util.Lists;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -13,6 +14,7 @@
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.enrichment.Enrichment;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.ConflictException;
import org.zalando.nakadi.exceptions.runtime.EventTypeDeletionException;
import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException;
Expand Down Expand Up @@ -43,7 +45,9 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType;
import static org.zalando.nakadi.utils.TestUtils.buildResourceAuthorization;
import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted;
import static org.zalando.nakadi.utils.TestUtils.createSubscription;

public class EventTypeServiceTest {

Expand Down Expand Up @@ -134,6 +138,82 @@ public void testFeatureToggleAllowsDeleteEventTypeWithSubscriptions() throws Exc
// no exception should be thrown
}

@Test
public void testFeatureToggleAllowsDeletEventTypeWithAuthzSectionAndDeletableSubscription() throws Exception {
final EventType eventType = buildDefaultEventType();
eventType.setAuthorization(buildResourceAuthorization());

doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName());
doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3")))
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20);
doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3")))
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 1);
doReturn(Lists.emptyList())
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20);
doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication();
doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup();

when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS))
.thenReturn(true);

eventTypeService.delete(eventType.getName());
// no exception should be thrown
}

@Test
public void testFeatureToggleForbidsDeleteEventTypeWithoutAuthzSection() throws Exception {
final EventType eventType = buildDefaultEventType();

doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName());
doReturn(ImmutableList.of(createSubscription("nakadi_archiver", "nakadi_to_s3")))
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20);
doReturn(Lists.emptyList())
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20);
doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication();
doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup();

when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS))
.thenReturn(true);

try {
eventTypeService.delete(eventType.getName());
} catch (AccessDeniedException e) {
return;
}
fail("Should throw AccessDeniedException");
}

@Test
public void testFeatureToggleForbidsDeleteEventTypeWithNonDeletableSubscription() throws Exception {
final EventType eventType = buildDefaultEventType();
eventType.setAuthorization(buildResourceAuthorization());

doReturn(Optional.of(eventType)).when(eventTypeRepository).findByNameO(eventType.getName());
doReturn(ImmutableList.of(createSubscription("someone", "something")))
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 20);
doReturn(Lists.emptyList())
.when(subscriptionDbRepository)
.listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 20, 20);
doReturn("nakadi_archiver").when(nakadiSettings).getDeletableSubscriptionOwningApplication();
doReturn("nakadi_to_s3").when(nakadiSettings).getDeletableSubscriptionConsumerGroup();

when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.EVENT_TYPE_DELETION_ONLY_ADMINS))
.thenReturn(true);

try {
eventTypeService.delete(eventType.getName());
} catch (AccessDeniedException e) {
return;
}
fail("Should throw AccessDeniedException");
}

@Test(expected = FeatureNotAvailableException.class)
public void testFeatureToggleDisableLogCompaction() {
final EventType eventType = buildDefaultEventType();
Expand Down
16 changes: 15 additions & 1 deletion src/test/java/org/zalando/nakadi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
import org.zalando.nakadi.domain.BatchFactory;
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.domain.ResourceAuthorization;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.storage.Storage;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.plugin.api.authz.AuthorizationAttribute;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.plugin.api.authz.Resource;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.problem.Problem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -141,6 +144,13 @@ public static EventType buildDefaultEventType() {
return EventTypeTestBuilder.builder().build();
}

public static ResourceAuthorization buildResourceAuthorization() {
final List<AuthorizationAttribute> admins = new ArrayList<>();
final List<AuthorizationAttribute> readers = new ArrayList<>();
final List<AuthorizationAttribute> writers = new ArrayList<>();
return new ResourceAuthorization(admins, readers, writers);
}

public static AccessDeniedException mockAccessDeniedException() {
final Resource resource = mock(Resource.class);
when(resource.getName()).thenReturn("some-name");
Expand Down Expand Up @@ -224,6 +234,10 @@ public static List<Subscription> createRandomSubscriptions(final int count) {
return createRandomSubscriptions(count, randomTextString());
}

public static Subscription createSubscription(final String owningApp, final String consumerGroup) {
return builder().withConsumerGroup(consumerGroup).withOwningApplication(owningApp).build();
}

public static Timeline buildTimeline(final String etName) {
return new Timeline(etName, 0, new Storage("ccc", Storage.Type.KAFKA), randomUUID(), new Date());
}
Expand Down

0 comments on commit 41fd11a

Please sign in to comment.