-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML][Inference] adding license checks #49056
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,9 +10,12 @@ | |
import org.elasticsearch.action.support.HandledTransportAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.license.LicenseUtils; | ||
import org.elasticsearch.license.XPackLicenseState; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.XPackField; | ||
import org.elasticsearch.xpack.core.ml.action.InferModelAction; | ||
import org.elasticsearch.xpack.core.ml.inference.results.InferenceResults; | ||
import org.elasticsearch.xpack.ml.inference.loadingservice.Model; | ||
|
@@ -24,20 +27,28 @@ public class TransportInferModelAction extends HandledTransportAction<InferModel | |
|
||
private final ModelLoadingService modelLoadingService; | ||
private final Client client; | ||
private final XPackLicenseState licenseState; | ||
|
||
@Inject | ||
public TransportInferModelAction(TransportService transportService, | ||
ActionFilters actionFilters, | ||
ModelLoadingService modelLoadingService, | ||
Client client) { | ||
Client client, | ||
XPackLicenseState licenseState) { | ||
super(InferModelAction.NAME, transportService, actionFilters, InferModelAction.Request::new); | ||
this.modelLoadingService = modelLoadingService; | ||
this.client = client; | ||
this.licenseState = licenseState; | ||
} | ||
|
||
@Override | ||
protected void doExecute(Task task, InferModelAction.Request request, ActionListener<InferModelAction.Response> listener) { | ||
|
||
if (licenseState.isMachineLearningAllowed() == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do other actions (get, delete) already have this license check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I don't think they should either. They don't really provide licensed value IMO. This is similar to how we treat anomaly jobs as well. |
||
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); | ||
return; | ||
} | ||
|
||
ActionListener<Model> getModelListener = ActionListener.wrap( | ||
model -> { | ||
TypedChainTaskExecutor<InferenceResults> typedChainTaskExecutor = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,11 @@ | |
import org.elasticsearch.ingest.Pipeline; | ||
import org.elasticsearch.ingest.PipelineConfiguration; | ||
import org.elasticsearch.ingest.Processor; | ||
import org.elasticsearch.license.LicenseStateListener; | ||
import org.elasticsearch.license.LicenseUtils; | ||
import org.elasticsearch.license.XPackLicenseState; | ||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.xpack.core.XPackField; | ||
import org.elasticsearch.xpack.core.ml.action.InferModelAction; | ||
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig; | ||
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfig; | ||
|
@@ -144,20 +148,28 @@ public String getType() { | |
return TYPE; | ||
} | ||
|
||
public static final class Factory implements Processor.Factory, Consumer<ClusterState> { | ||
public static final class Factory implements Processor.Factory, Consumer<ClusterState>, LicenseStateListener { | ||
|
||
private static final Logger logger = LogManager.getLogger(Factory.class); | ||
|
||
private final Client client; | ||
private final IngestService ingestService; | ||
private final XPackLicenseState licenseState; | ||
private volatile int currentInferenceProcessors; | ||
private volatile int maxIngestProcessors; | ||
private volatile Version minNodeVersion = Version.CURRENT; | ||
private volatile boolean inferenceAllowed; | ||
|
||
public Factory(Client client, ClusterService clusterService, Settings settings, IngestService ingestService) { | ||
public Factory(Client client, | ||
ClusterService clusterService, | ||
Settings settings, | ||
IngestService ingestService, | ||
XPackLicenseState licenseState) { | ||
this.client = client; | ||
this.maxIngestProcessors = MAX_INFERENCE_PROCESSORS.get(settings); | ||
this.ingestService = ingestService; | ||
this.licenseState = licenseState; | ||
this.inferenceAllowed = licenseState.isMachineLearningAllowed(); | ||
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_INFERENCE_PROCESSORS, this::setMaxIngestProcessors); | ||
} | ||
|
||
|
@@ -199,6 +211,10 @@ int numInferenceProcessors() { | |
public InferenceProcessor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) | ||
throws Exception { | ||
|
||
if (inferenceAllowed == false) { | ||
throw LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING); | ||
} | ||
|
||
if (this.maxIngestProcessors <= currentInferenceProcessors) { | ||
throw new ElasticsearchStatusException("Max number of inference processors reached, total inference processors [{}]. " + | ||
"Adjust the setting [{}]: [{}] if a greater number is desired.", | ||
|
@@ -272,5 +288,10 @@ void checkSupportedVersion(InferenceConfig config) { | |
minNodeVersion)); | ||
} | ||
} | ||
|
||
@Override | ||
public void licenseStateChanged() { | ||
this.inferenceAllowed = licenseState.isMachineLearningAllowed(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the mechanism which updates There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is a singleton that gets mutated in place. See other examples in |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some due diligence. If there was an ingest pipeline in the cluster state containing an inference ingest processor and the cluster was restarted with
xpack.ml.enabled: false
what would happen?The tentative plan for restoring full cluster snapshots in Cloud in the future is to disable all X-Pack plugins during the snapshot restore, which will lead to this exact situation.
If there's any doubt about what will happen it might be safer to allow the ingest processors to exist but just have them fail on every document they process (via the failure response from the infer model action) if the license is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 aren't
xpack.ml.enabled
and the license checks to different things? Additionally, this is exactly what the enrich project has done (if enrich is disabled, do not provide the processors). I will reach out to core features to see what they think about this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 pipelines are stored as maps in the cluster state and are not fully marshaled until the pipeline is created on the ingest node. So, clusterstate restoration and pipeline instantiation are two different steps.
The cluster will start up fine, the pipeline will just fail to be instantiated on the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK great. Sounds like it's not a problem then and the code can stay as it is.