diff --git a/README.adoc b/README.adoc index 76c2db777..2e5ba21c3 100644 --- a/README.adoc +++ b/README.adoc @@ -139,7 +139,7 @@ spec: annotations: scheduler.alpha.kubernetes.io/critical-pod: "" # <10> ---- -<1> The default strategy is `allInOne`. The only other possible value is `production`. +<1> The default strategy is `allInOne`. The only other possible values are `production` and `streaming`. <2> The image to use, in a regular Docker syntax <3> The (non-storage related) options to be passed verbatim to the underlying binary. Refer to the Jaeger documentation and/or to the `--help` option from the related binary for all the available options. <4> The option is a simple `key: value` map. In this case, we want the option `--log-level=debug` to be passed to the binary. @@ -150,6 +150,67 @@ spec: <9> By default, the operator assumes that agents are deployed as sidecars within the target pods. Specifying the strategy as "DaemonSet" changes that and makes the operator deploy the agent as DaemonSet. Note that your tracer client will probably have to override the "JAEGER_AGENT_HOST" env var to use the node's IP. <10> Define annotations to be applied to all deployments (not services). These can be overridden by annotations defined on the individual components. +== Strategies + +As shown in the example above, the Jaeger instance is associated with a strategy. The strategy determines the architecture to be used for the Jaeger backend. + +The available strategies are described in the following sections. + +=== AllInOne (Default) + +This strategy is intended for development, testing and demo purposes. + +The main backend components, agent, collector and query service, are all packaged into a single executable which is configured (by default) to use in-memory storage. + +=== Production + +The `production` strategy is intended (as the name suggests) for production environments, where long term storage of trace data is important, as well as a more scalable and highly available architecture is required. Each of the backend components is therefore separately deployed. + +The agent can be injected as a sidecar on the instrumented application or as a daemonset. + +The query and collector services are configured with a supported storage type - currently cassandra or elasticsearch. Multiple instances of each of these components can be provisioned as required for performance and resilience purposes. + +The main additional requirement is to provide the details of the storage type and options, e.g. + +[source,yaml] +---- + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 +---- + +=== Streaming + +The `streaming` strategy is designed to augment the `production` strategy by providing a streaming capability that effectively sits between the collector and the backend storage (e.g. cassandra or elasticsearch). This provides the benefit of reducing the pressure on the backend storage, under high load situations, and enables other trace post processing capabilities to tap into the real time span data directly from the streaming platform (kafka). + +The only additional information required is to provide the details for accessing the Kafka platform, which is configured in a new `ingester` component: + +[source,yaml] +---- +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: simple-streaming +spec: + strategy: streaming + ingester: + options: + kafka: # <1> + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 + ingester: + deadlockInterval: 0 # <2> + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 +---- +<1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages +<2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period + == Accessing the UI === Kubernetes @@ -354,7 +415,7 @@ spec: datacenter: "datacenter3" mode: "test" ---- -<1> The same works for `production` +<1> The same works for `production` and `streaming` <2> These options are for the regular Jaeger components, like `collector` and `query` <3> The options for the `create-schema` job diff --git a/deploy/examples/simple-streaming.yaml b/deploy/examples/simple-streaming.yaml new file mode 100644 index 000000000..88b4f2da1 --- /dev/null +++ b/deploy/examples/simple-streaming.yaml @@ -0,0 +1,24 @@ +# setup an elasticsearch with `make es` +# setup a kafka platform using https://strimzi.io with quickstart instructions +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: simple-streaming +spec: + strategy: streaming + collector: + options: + log-level: debug + ingester: + options: + kafka: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 + ingester: + deadlockInterval: 0 + log-level: debug + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 diff --git a/pkg/apis/io/v1alpha1/jaeger_types.go b/pkg/apis/io/v1alpha1/jaeger_types.go index 67910cf23..a0839ab1d 100644 --- a/pkg/apis/io/v1alpha1/jaeger_types.go +++ b/pkg/apis/io/v1alpha1/jaeger_types.go @@ -50,6 +50,7 @@ type JaegerSpec struct { AllInOne JaegerAllInOneSpec `json:"allInOne"` Query JaegerQuerySpec `json:"query"` Collector JaegerCollectorSpec `json:"collector"` + Ingester JaegerIngesterSpec `json:"ingester"` Agent JaegerAgentSpec `json:"agent"` UI JaegerUISpec `json:"ui"` Sampling JaegerSamplingSpec `json:"sampling"` @@ -111,6 +112,14 @@ type JaegerCollectorSpec struct { JaegerCommonSpec } +// JaegerIngesterSpec defines the options to be used when deploying the ingester +type JaegerIngesterSpec struct { + Size int `json:"size"` + Image string `json:"image"` + Options Options `json:"options"` + JaegerCommonSpec +} + // JaegerAgentSpec defines the options to be used when deploying the agent type JaegerAgentSpec struct { Strategy string `json:"strategy"` // can be either 'DaemonSet' or 'Sidecar' (default) diff --git a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go index 4a6d4e44e..c5ea5e67c 100644 --- a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go @@ -229,6 +229,24 @@ func (in *JaegerEsIndexCleanerSpec) DeepCopy() *JaegerEsIndexCleanerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JaegerIngesterSpec) DeepCopyInto(out *JaegerIngesterSpec) { + *out = *in + in.Options.DeepCopyInto(&out.Options) + in.JaegerCommonSpec.DeepCopyInto(&out.JaegerCommonSpec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JaegerIngesterSpec. +func (in *JaegerIngesterSpec) DeepCopy() *JaegerIngesterSpec { + if in == nil { + return nil + } + out := new(JaegerIngesterSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JaegerIngressSpec) DeepCopyInto(out *JaegerIngressSpec) { *out = *in @@ -325,6 +343,7 @@ func (in *JaegerSpec) DeepCopyInto(out *JaegerSpec) { in.AllInOne.DeepCopyInto(&out.AllInOne) in.Query.DeepCopyInto(&out.Query) in.Collector.DeepCopyInto(&out.Collector) + in.Ingester.DeepCopyInto(&out.Ingester) in.Agent.DeepCopyInto(&out.Agent) in.UI.DeepCopyInto(&out.UI) in.Sampling.DeepCopyInto(&out.Sampling) diff --git a/pkg/cmd/start/main.go b/pkg/cmd/start/main.go index 783b50514..e3518386a 100644 --- a/pkg/cmd/start/main.go +++ b/pkg/cmd/start/main.go @@ -39,6 +39,9 @@ func NewStartCommand() *cobra.Command { cmd.Flags().String("jaeger-collector-image", "jaegertracing/jaeger-collector", "The Docker image for the Jaeger Collector") viper.BindPFlag("jaeger-collector-image", cmd.Flags().Lookup("jaeger-collector-image")) + cmd.Flags().String("jaeger-ingester-image", "jaegertracing/jaeger-ingester", "The Docker image for the Jaeger Ingester") + viper.BindPFlag("jaeger-ingester-image", cmd.Flags().Lookup("jaeger-ingester-image")) + cmd.Flags().String("jaeger-all-in-one-image", "jaegertracing/all-in-one", "The Docker image for the Jaeger all-in-one") viper.BindPFlag("jaeger-all-in-one-image", cmd.Flags().Lookup("jaeger-all-in-one-image")) diff --git a/pkg/deployment/all-in-one_test.go b/pkg/deployment/all-in-one_test.go index d2b215e9d..8b7ca409c 100644 --- a/pkg/deployment/all-in-one_test.go +++ b/pkg/deployment/all-in-one_test.go @@ -27,11 +27,11 @@ func TestDefaultAllInOneImage(t *testing.T) { assert.Equal(t, "org/custom-all-in-one-image:123", d.Spec.Template.Spec.Containers[0].Image) envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, @@ -80,27 +80,27 @@ func TestAllInOneVolumeMountsWithVolumes(t *testing.T) { name := "TestAllInOneVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } allInOneVolumes := []v1.Volume{ - v1.Volume{ + { Name: "allInOneVolume", VolumeSource: v1.VolumeSource{}, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "allInOneVolume", }, } @@ -138,14 +138,14 @@ func TestAllInOneMountGlobalVolumes(t *testing.T) { name := "TestAllInOneMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -166,14 +166,14 @@ func TestAllInOneVolumeMountsWithSameName(t *testing.T) { name := "TestAllInOneVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -194,14 +194,14 @@ func TestAllInOneVolumeWithSameName(t *testing.T) { name := "TestAllInOneVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } allInOneVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index e17767d1a..84ca66a0c 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -2,6 +2,7 @@ package deployment import ( "fmt" + "strings" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -64,8 +65,15 @@ func (c *Collector) Get() *appsv1.Deployment { }) } + storageType := c.jaeger.Spec.Storage.Type + // If strategy is "streaming", then change storage type + // to Kafka, and the storage options will be used in the Ingester instead + if strings.EqualFold(c.jaeger.Spec.Strategy, "streaming") { + storageType = "kafka" + } options := allArgs(c.jaeger.Spec.Collector.Options, - c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(c.jaeger.Spec.Storage.Type))) + c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)), + c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType))) sampling.Update(c.jaeger, commonSpec, &options) @@ -105,7 +113,7 @@ func (c *Collector) Get() *appsv1.Deployment { Env: []v1.EnvVar{ v1.EnvVar{ Name: "SPAN_STORAGE_TYPE", - Value: c.jaeger.Spec.Storage.Type, + Value: storageType, }, v1.EnvVar{ Name: "COLLECTOR_ZIPKIN_HTTP_PORT", diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index bab0fb389..8cff8d41e 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" ) @@ -60,11 +61,11 @@ func TestDefaultCollectorImage(t *testing.T) { assert.Equal(t, "org/custom-collector-image:123", containers[0].Image) envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, @@ -107,27 +108,27 @@ func TestCollectorVolumeMountsWithVolumes(t *testing.T) { name := "TestCollectorVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } collectorVolumes := []v1.Volume{ - v1.Volume{ + { Name: "collectorVolume", VolumeSource: v1.VolumeSource{}, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "collectorVolume", }, } @@ -154,14 +155,14 @@ func TestCollectorMountGlobalVolumes(t *testing.T) { name := "TestCollectorMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -182,14 +183,14 @@ func TestCollectorVolumeMountsWithSameName(t *testing.T) { name := "TestCollectorVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -210,14 +211,14 @@ func TestCollectorVolumeWithSameName(t *testing.T) { name := "TestCollectorVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } collectorVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, @@ -276,3 +277,109 @@ func TestCollectorLabels(t *testing.T) { assert.Equal(t, c.jaeger.Name, dep.Spec.Template.Labels["app.kubernetes.io/instance"]) assert.Equal(t, fmt.Sprintf("%s-collector", c.jaeger.Name), dep.Spec.Template.Labels["app.kubernetes.io/name"]) } + +func TestCollectorWithDirectStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithDirectStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "elasticsearch", + }, + { + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) + assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0]) +} + +func TestCollectorWithIngesterStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithIngesterStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "kafka", + }, + { + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) + assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1]) +} + +func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithIngesterNoOptionsStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "kafka", + }, + { + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) +} diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go new file mode 100644 index 000000000..251ca15b0 --- /dev/null +++ b/pkg/deployment/ingester.go @@ -0,0 +1,163 @@ +package deployment + +import ( + "fmt" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" + "github.com/jaegertracing/jaeger-operator/pkg/util" +) + +// Ingester builds pods for jaegertracing/jaeger-ingester +type Ingester struct { + jaeger *v1alpha1.Jaeger +} + +// NewIngester builds a new Ingester struct based on the given spec +func NewIngester(jaeger *v1alpha1.Jaeger) *Ingester { + if jaeger.Spec.Ingester.Size <= 0 { + jaeger.Spec.Ingester.Size = 1 + } + + if jaeger.Spec.Ingester.Image == "" { + jaeger.Spec.Ingester.Image = fmt.Sprintf("%s:%s", viper.GetString("jaeger-ingester-image"), viper.GetString("jaeger-version")) + } + + return &Ingester{jaeger: jaeger} +} + +// Get returns a ingester pod +func (i *Ingester) Get() *appsv1.Deployment { + if !strings.EqualFold(i.jaeger.Spec.Strategy, "streaming") { + return nil + } + + logrus.Debugf("Assembling a ingester deployment for %v", i.jaeger) + + labels := i.labels() + trueVar := true + replicas := int32(i.jaeger.Spec.Ingester.Size) + + baseCommonSpec := v1alpha1.JaegerCommonSpec{ + Annotations: map[string]string{ + "prometheus.io/scrape": "true", + "prometheus.io/port": "14271", + "sidecar.istio.io/inject": "false", + }, + } + + commonSpec := util.Merge([]v1alpha1.JaegerCommonSpec{i.jaeger.Spec.Ingester.JaegerCommonSpec, i.jaeger.Spec.JaegerCommonSpec, baseCommonSpec}) + + var envFromSource []v1.EnvFromSource + if len(i.jaeger.Spec.Storage.SecretName) > 0 { + envFromSource = append(envFromSource, v1.EnvFromSource{ + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: i.jaeger.Spec.Storage.SecretName, + }, + }, + }) + } + + options := allArgs(i.jaeger.Spec.Ingester.Options, + i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)), + i.jaeger.Spec.Storage.Options.Filter("kafka")) + + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: i.name(), + Namespace: i.jaeger.Namespace, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: i.jaeger.APIVersion, + Kind: i.jaeger.Kind, + Name: i.jaeger.Name, + UID: i.jaeger.UID, + Controller: &trueVar, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Annotations: commonSpec.Annotations, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Image: i.jaeger.Spec.Ingester.Image, + Name: "jaeger-ingester", + Args: options, + Env: []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: i.jaeger.Spec.Storage.Type, + }, + }, + VolumeMounts: commonSpec.VolumeMounts, + EnvFrom: envFromSource, + Ports: []v1.ContainerPort{ + { + ContainerPort: 14270, + Name: "hc-http", + }, + { + ContainerPort: 14271, + Name: "metrics-http", + }, + }, + ReadinessProbe: &v1.Probe{ + Handler: v1.Handler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(14270), + }, + }, + InitialDelaySeconds: 1, + }, + Resources: commonSpec.Resources, + }}, + Volumes: commonSpec.Volumes, + }, + }, + }, + } +} + +// Services returns a list of services to be deployed along with the ingester deployment +func (i *Ingester) Services() []*v1.Service { + // Return empty list, as a service is not required for this deployment, which also + // simplifies switching between different strategies. + return []*v1.Service{} +} + +func (i *Ingester) labels() map[string]string { + return map[string]string{ + "app": "jaeger", // TODO(jpkroehling): see collector.go in this package + "app.kubernetes.io/name": i.name(), + "app.kubernetes.io/instance": i.jaeger.Name, + "app.kubernetes.io/component": "ingester", + "app.kubernetes.io/part-of": "jaeger", + "app.kubernetes.io/managed-by": "jaeger-operator", + } +} + +func (i *Ingester) name() string { + return fmt.Sprintf("%s-ingester", i.jaeger.Name) +} diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go new file mode 100644 index 000000000..465feae7d --- /dev/null +++ b/pkg/deployment/ingester_test.go @@ -0,0 +1,333 @@ +package deployment + +import ( + "fmt" + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func init() { + viper.SetDefault("jaeger-version", "1.6") + viper.SetDefault("jaeger-ingester-image", "jaegertracing/jaeger-ingester") +} + +func TestIngesterNotDefined(t *testing.T) { + jaeger := v1alpha1.NewJaeger("TestIngesterNotDefined") + + ingester := NewIngester(jaeger) + assert.Nil(t, ingester.Get()) +} + +func TestIngesterNegativeSize(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterNegativeSize") + jaeger.Spec.Ingester.Size = -1 + + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, int32(1), *dep.Spec.Replicas) +} + +func TestIngesterDefaultSize(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterDefaultSize") + jaeger.Spec.Ingester.Size = 0 + + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, int32(1), *dep.Spec.Replicas) +} + +func TestIngesterName(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterName") + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, "TestIngesterName-ingester", dep.ObjectMeta.Name) +} + +func TestIngesterServices(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterServices") + ingester := NewIngester(jaeger) + svcs := ingester.Services() + assert.Len(t, svcs, 0) +} + +func TestDefaultIngesterImage(t *testing.T) { + viper.Set("jaeger-ingester-image", "org/custom-ingester-image") + viper.Set("jaeger-version", "123") + defer viper.Reset() + + ingester := NewIngester(newIngesterJaeger("TestDefaultIngesterImage")) + dep := ingester.Get() + + containers := dep.Spec.Template.Spec.Containers + assert.Len(t, containers, 1) + assert.Equal(t, "org/custom-ingester-image:123", containers[0].Image) + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "", + }, + } + assert.Equal(t, envvars, containers[0].Env) +} + +func TestIngesterAnnotations(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterAnnotations") + jaeger.Spec.Annotations = map[string]string{ + "name": "operator", + "hello": "jaeger", + } + jaeger.Spec.Ingester.Annotations = map[string]string{ + "hello": "world", // Override top level annotation + "prometheus.io/scrape": "false", // Override implicit value + } + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, "operator", dep.Spec.Template.Annotations["name"]) + assert.Equal(t, "false", dep.Spec.Template.Annotations["sidecar.istio.io/inject"]) + assert.Equal(t, "world", dep.Spec.Template.Annotations["hello"]) + assert.Equal(t, "false", dep.Spec.Template.Annotations["prometheus.io/scrape"]) +} + +func TestIngesterSecrets(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterSecrets") + secret := "mysecret" + jaeger.Spec.Storage.SecretName = secret + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, "mysecret", dep.Spec.Template.Spec.Containers[0].EnvFrom[0].SecretRef.LocalObjectReference.Name) +} + +func TestIngesterVolumeMountsWithVolumes(t *testing.T) { + name := "TestIngesterVolumeMountsWithVolumes" + + globalVolumes := []v1.Volume{ + { + Name: "globalVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + globalVolumeMounts := []v1.VolumeMount{ + { + Name: "globalVolume", + }, + } + + ingesterVolumes := []v1.Volume{ + { + Name: "ingesterVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + { + Name: "ingesterVolume", + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.VolumeMounts = globalVolumeMounts + jaeger.Spec.Ingester.Volumes = ingesterVolumes + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + assert.Len(t, podSpec.Volumes, len(append(ingesterVolumes, globalVolumes...))) + assert.Len(t, podSpec.Containers[0].VolumeMounts, len(append(ingesterVolumeMounts, globalVolumeMounts...))) + + // ingester is first while global is second + assert.Equal(t, "ingesterVolume", podSpec.Volumes[0].Name) + assert.Equal(t, "globalVolume", podSpec.Volumes[1].Name) + assert.Equal(t, "ingesterVolume", podSpec.Containers[0].VolumeMounts[0].Name) + assert.Equal(t, "globalVolume", podSpec.Containers[0].VolumeMounts[1].Name) +} + +func TestIngesterMountGlobalVolumes(t *testing.T) { + name := "TestIngesterMountGlobalVolumes" + + globalVolumes := []v1.Volume{ + { + Name: "globalVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + { + Name: "globalVolume", + ReadOnly: true, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + assert.Len(t, podSpec.Containers[0].VolumeMounts, 1) + // ingester volume is mounted + assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].Name, "globalVolume") +} + +func TestIngesterVolumeMountsWithSameName(t *testing.T) { + name := "TestIngesterVolumeMountsWithSameName" + + globalVolumeMounts := []v1.VolumeMount{ + { + Name: "data", + ReadOnly: true, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + { + Name: "data", + ReadOnly: false, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.VolumeMounts = globalVolumeMounts + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + assert.Len(t, podSpec.Containers[0].VolumeMounts, 1) + // ingester volume is mounted + assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].ReadOnly, false) +} + +func TestIngesterVolumeWithSameName(t *testing.T) { + name := "TestIngesterVolumeWithSameName" + + globalVolumes := []v1.Volume{ + { + Name: "data", + VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, + }, + } + + ingesterVolumes := []v1.Volume{ + { + Name: "data", + VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.Ingester.Volumes = ingesterVolumes + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + assert.Len(t, podSpec.Volumes, 1) + // ingester volume is mounted + assert.Equal(t, podSpec.Volumes[0].VolumeSource.HostPath.Path, "/data2") +} + +func TestIngesterResources(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterResources") + jaeger.Spec.Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceLimitsCPU: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourceLimitsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + Requests: v1.ResourceList{ + v1.ResourceRequestsCPU: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourceRequestsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + } + jaeger.Spec.Ingester.Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceLimitsCPU: *resource.NewQuantity(2048, resource.BinarySI), + v1.ResourceLimitsMemory: *resource.NewQuantity(123, resource.DecimalSI), + }, + Requests: v1.ResourceList{ + v1.ResourceRequestsCPU: *resource.NewQuantity(2048, resource.BinarySI), + v1.ResourceRequestsMemory: *resource.NewQuantity(123, resource.DecimalSI), + }, + } + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, *resource.NewQuantity(2048, resource.BinarySI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsCPU]) + assert.Equal(t, *resource.NewQuantity(2048, resource.BinarySI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsCPU]) + assert.Equal(t, *resource.NewQuantity(123, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsMemory]) + assert.Equal(t, *resource.NewQuantity(123, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsMemory]) + assert.Equal(t, *resource.NewQuantity(512, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsEphemeralStorage]) + assert.Equal(t, *resource.NewQuantity(512, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsEphemeralStorage]) +} + +func TestIngesterWithStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestIngesterStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + ingester := NewIngester(jaeger) + dep := ingester.Get() + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "elasticsearch", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) + assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[0]) + assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[1]) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[2]) +} + +func TestIngesterLabels(t *testing.T) { + ingester := NewIngester(newIngesterJaeger("TestIngesterLabels")) + dep := ingester.Get() + assert.Equal(t, "jaeger-operator", dep.Spec.Template.Labels["app.kubernetes.io/managed-by"]) + assert.Equal(t, "ingester", dep.Spec.Template.Labels["app.kubernetes.io/component"]) + assert.Equal(t, ingester.jaeger.Name, dep.Spec.Template.Labels["app.kubernetes.io/instance"]) + assert.Equal(t, fmt.Sprintf("%s-ingester", ingester.jaeger.Name), dep.Spec.Template.Labels["app.kubernetes.io/name"]) +} + +func newIngesterJaeger(name string) *v1alpha1.Jaeger { + return &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "any": "option", + }), + }, + }, + } +} diff --git a/pkg/deployment/query_test.go b/pkg/deployment/query_test.go index 195ebb393..a6dedaf8f 100644 --- a/pkg/deployment/query_test.go +++ b/pkg/deployment/query_test.go @@ -98,27 +98,27 @@ func TestQueryVolumeMountsWithVolumes(t *testing.T) { name := "TestQueryVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } queryVolumes := []v1.Volume{ - v1.Volume{ + { Name: "queryVolume", VolumeSource: v1.VolumeSource{}, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "queryVolume", }, } @@ -144,14 +144,14 @@ func TestQueryMountGlobalVolumes(t *testing.T) { name := "TestQueryMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -171,14 +171,14 @@ func TestQueryVolumeMountsWithSameName(t *testing.T) { name := "TestQueryVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -198,14 +198,14 @@ func TestQueryVolumeWithSameName(t *testing.T) { name := "TestQueryVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } queryVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, diff --git a/pkg/strategy/controller.go b/pkg/strategy/controller.go index 014de8a9f..e47a700f4 100644 --- a/pkg/strategy/controller.go +++ b/pkg/strategy/controller.go @@ -36,6 +36,10 @@ func For(ctx context.Context, jaeger *v1alpha1.Jaeger) S { return newAllInOneStrategy(ctx, jaeger) } + if strings.ToLower(jaeger.Spec.Strategy) == "streaming" { + return newStreamingStrategy(ctx, jaeger) + } + return newProductionStrategy(ctx, jaeger) } @@ -65,7 +69,7 @@ func normalize(jaeger *v1alpha1.Jaeger) { } // normalize the deployment strategy - if strings.ToLower(jaeger.Spec.Strategy) != "production" { + if strings.ToLower(jaeger.Spec.Strategy) != "production" && strings.ToLower(jaeger.Spec.Strategy) != "streaming" { jaeger.Spec.Strategy = "allInOne" } diff --git a/pkg/strategy/controller_test.go b/pkg/strategy/controller_test.go index eb65a2298..3305aa67f 100644 --- a/pkg/strategy/controller_test.go +++ b/pkg/strategy/controller_test.go @@ -100,6 +100,19 @@ func TestIncompatibleStorageForProduction(t *testing.T) { assert.Equal(t, "allInOne", jaeger.Spec.Strategy) } +func TestIncompatibleStorageForStreaming(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Storage: v1alpha1.JaegerStorageSpec{ + Type: "memory", + }, + }, + } + normalize(jaeger) + assert.Equal(t, "allInOne", jaeger.Spec.Strategy) +} + func TestDeprecatedAllInOneStrategy(t *testing.T) { jaeger := &v1alpha1.Jaeger{ Spec: v1alpha1.JaegerSpec{ diff --git a/pkg/strategy/streaming.go b/pkg/strategy/streaming.go new file mode 100644 index 000000000..c9941802b --- /dev/null +++ b/pkg/strategy/streaming.go @@ -0,0 +1,127 @@ +package strategy + +import ( + "context" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jaegertracing/jaeger-operator/pkg/account" + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" + "github.com/jaegertracing/jaeger-operator/pkg/config/ui" + "github.com/jaegertracing/jaeger-operator/pkg/cronjob" + "github.com/jaegertracing/jaeger-operator/pkg/deployment" + "github.com/jaegertracing/jaeger-operator/pkg/ingress" + "github.com/jaegertracing/jaeger-operator/pkg/inject" + "github.com/jaegertracing/jaeger-operator/pkg/route" + "github.com/jaegertracing/jaeger-operator/pkg/storage" +) + +type streamingStrategy struct { + ctx context.Context + jaeger *v1alpha1.Jaeger +} + +func newStreamingStrategy(ctx context.Context, jaeger *v1alpha1.Jaeger) *streamingStrategy { + return &streamingStrategy{ + ctx: ctx, + jaeger: jaeger, + } +} + +func (c *streamingStrategy) Create() []runtime.Object { + // TODO: Look at ways to refactor this, with the production strategy Create(), to reuse + // common elements. + collector := deployment.NewCollector(c.jaeger) + query := deployment.NewQuery(c.jaeger) + agent := deployment.NewAgent(c.jaeger) + ingester := deployment.NewIngester(c.jaeger) + os := []runtime.Object{} + + // add all service accounts + for _, acc := range account.Get(c.jaeger) { + os = append(os, acc) + } + + // add the config map + cm := configmap.NewUIConfig(c.jaeger).Get() + if nil != cm { + os = append(os, cm) + } + + // add the Sampling config map + scmp := sampling.NewConfig(c.jaeger).Get() + if nil != scmp { + os = append(os, scmp) + } + + // add the deployments + os = append(os, + collector.Get(), + inject.OAuthProxy(c.jaeger, query.Get()), + ) + + ingesterDeployment := ingester.Get() + if ingesterDeployment != nil { + os = append(os, ingesterDeployment) + } + + if ds := agent.Get(); nil != ds { + os = append(os, ds) + } + + // add the services + for _, svc := range collector.Services() { + os = append(os, svc) + } + + for _, svc := range query.Services() { + os = append(os, svc) + } + + for _, svc := range ingester.Services() { + os = append(os, svc) + } + + // add the routes/ingresses + if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { + if q := route.NewQueryRoute(c.jaeger).Get(); nil != q { + os = append(os, q) + } + } else { + if q := ingress.NewQueryIngress(c.jaeger).Get(); nil != q { + os = append(os, q) + } + } + + if isBoolTrue(c.jaeger.Spec.Storage.SparkDependencies.Enabled) { + if cronjob.SupportedStorage(c.jaeger.Spec.Storage.Type) { + os = append(os, cronjob.CreateSparkDependencies(c.jaeger)) + } else { + logrus.WithField("type", c.jaeger.Spec.Storage.Type).Warn("Skipping spark dependencies job due to unsupported storage.") + } + } + + if isBoolTrue(c.jaeger.Spec.Storage.EsIndexCleaner.Enabled) { + if strings.ToLower(c.jaeger.Spec.Storage.Type) == "elasticsearch" { + os = append(os, cronjob.CreateEsIndexCleaner(c.jaeger)) + } else { + logrus.WithField("type", c.jaeger.Spec.Storage.Type).Warn("Skipping Elasticsearch index cleaner job due to unsupported storage.") + } + } + + return os +} + +func (c *streamingStrategy) Update() []runtime.Object { + logrus.Debug("Update isn't yet available") + return []runtime.Object{} +} + +func (c *streamingStrategy) Dependencies() []batchv1.Job { + return storage.Dependencies(c.jaeger) +} diff --git a/pkg/strategy/streaming_test.go b/pkg/strategy/streaming_test.go new file mode 100644 index 000000000..1554ee7e3 --- /dev/null +++ b/pkg/strategy/streaming_test.go @@ -0,0 +1,199 @@ +package strategy + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" +) + +func init() { + viper.SetDefault("jaeger-version", "1.6") + viper.SetDefault("jaeger-agent-image", "jaegertracing/jaeger-agent") +} + +func TestCreateStreamingDeployment(t *testing.T) { + name := "TestCreateStreamingDeployment" + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger(name)) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, false, false) +} + +func TestCreateStreamingDeploymentOnOpenShift(t *testing.T) { + viper.Set("platform", "openshift") + defer viper.Reset() + name := "TestCreateStreamingDeploymentOnOpenShift" + + jaeger := v1alpha1.NewJaeger(name) + normalize(jaeger) + + c := newStreamingStrategy(context.TODO(), jaeger) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, true, false) +} + +func TestCreateStreamingDeploymentWithDaemonSetAgent(t *testing.T) { + name := "TestCreateStreamingDeploymentWithDaemonSetAgent" + + j := v1alpha1.NewJaeger(name) + j.Spec.Agent.Strategy = "DaemonSet" + + c := newStreamingStrategy(context.TODO(), j) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, true, false, false) +} + +func TestCreateStreamingDeploymentWithUIConfigMap(t *testing.T) { + name := "TestCreateStreamingDeploymentWithUIConfigMap" + + j := v1alpha1.NewJaeger(name) + j.Spec.UI.Options = v1alpha1.NewFreeForm(map[string]interface{}{ + "tracking": map[string]interface{}{ + "gaID": "UA-000000-2", + }, + }) + + c := newStreamingStrategy(context.TODO(), j) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, false, true) +} + +func TestUpdateStreamingDeployment(t *testing.T) { + name := "TestUpdateStreamingDeployment" + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger(name)) + assert.Len(t, c.Update(), 0) +} + +func TestStreamingOptionsArePassed(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "io.jaegertracing/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-prod", + Namespace: "simple-prod-ns", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "es.server-urls": "http://elasticsearch.default.svc:9200", + "es.username": "elastic", + "es.password": "changeme", + }), + }, + }, + } + + ctrl := For(context.TODO(), jaeger) + objs := ctrl.Create() + deployments := getDeployments(objs) + for _, dep := range deployments { + args := dep.Spec.Template.Spec.Containers[0].Args + // Only the query and ingester should have the ES parameters defined + var escount int + for _, arg := range args { + if strings.Contains(arg, "es.") { + escount++ + } + } + if strings.Contains(dep.Name, "collector") { + // Including parameters for sampling config and kafka topic + assert.Len(t, args, 2) + assert.Equal(t, 0, escount) + } else if strings.Contains(dep.Name, "ingester") { + // Including parameters for ES and kafka topic + assert.Len(t, args, 4) + assert.Equal(t, 3, escount) + + } else { + // Including parameters for ES only + assert.Len(t, args, 3) + assert.Equal(t, 3, escount) + } + } +} + +func TestDelegateStreamingDepedencies(t *testing.T) { + // for now, we just have storage dependencies + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger("TestDelegateStreamingDepedencies")) + assert.Equal(t, c.Dependencies(), storage.Dependencies(c.jaeger)) +} + +func assertDeploymentsAndServicesForStreaming(t *testing.T, name string, objs []runtime.Object, hasDaemonSet bool, hasOAuthProxy bool, hasConfigMap bool) { + expectedNumObjs := 6 + + if hasDaemonSet { + expectedNumObjs++ + } + + if hasOAuthProxy { + expectedNumObjs++ + } + + if hasConfigMap { + expectedNumObjs++ + } + + assert.Len(t, objs, expectedNumObjs) + + deployments := map[string]bool{ + fmt.Sprintf("%s-collector", name): false, + fmt.Sprintf("%s-query", name): false, + } + + daemonsets := map[string]bool{ + fmt.Sprintf("%s-agent-daemonset", name): !hasDaemonSet, + } + + services := map[string]bool{ + fmt.Sprintf("%s-collector", name): false, + fmt.Sprintf("%s-query", name): false, + } + + ingresses := map[string]bool{} + routes := map[string]bool{} + if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { + routes[name] = false + } else { + ingresses[fmt.Sprintf("%s-query", name)] = false + } + + serviceAccounts := map[string]bool{} + if hasOAuthProxy { + serviceAccounts[fmt.Sprintf("%s-ui-proxy", name)] = false + } + + configMaps := map[string]bool{} + if hasConfigMap { + configMaps[fmt.Sprintf("%s-ui-configuration", name)] = false + } + assertHasAllObjects(t, name, objs, deployments, daemonsets, services, ingresses, routes, serviceAccounts, configMaps) +} + +func TestSparkDependenciesStreaming(t *testing.T) { + testSparkDependencies(t, func(jaeger *v1alpha1.Jaeger) S { + return &streamingStrategy{jaeger: jaeger} + }) +} + +func TestEsIndexClenarStreaming(t *testing.T) { + testEsIndexCleaner(t, func(jaeger *v1alpha1.Jaeger) S { + return &streamingStrategy{jaeger: jaeger} + }) +}