Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka module #2969

Merged
merged 1 commit into from
Nov 16, 2016
Merged

Kafka module #2969

merged 1 commit into from
Nov 16, 2016

Conversation

ruflin
Copy link
Member

@ruflin ruflin commented Nov 9, 2016

No description provided.

@ruflin ruflin added in progress Pull request is currently in progress. Metricbeat Metricbeat labels Nov 9, 2016
@ruflin ruflin mentioned this pull request Nov 10, 2016
@ruflin ruflin force-pushed the kafka-module branch 3 times, most recently from b0e1154 to 89cabf1 Compare November 10, 2016 09:46
@ruflin ruflin added review and removed in progress Pull request is currently in progress. labels Nov 10, 2016
type: list
description: >
List of replicas ids
- name: broker
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field appears twice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

- name: brocker
type: integer
description: >
Broker id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just publish broker ID or also the address? Sure, broker id is not supposed to change within cluster. But one might prefer to search by hostname.

These metadata are available from kafka: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse

The address can be found from Broker.Addr in sarma

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I also added the address

- name: replicas
type: list
description: >
List of replicas ids
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it more clear: set of alive nodes (broker IDs?)

You planning to integrate ISR notes? In kafka some slave nodes might not be in sync with broker yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it. I saw that there are some differences with notes and was thinking about adding a "status" field but not for the first version.

config.Net.ReadTimeout = m.Module().Config().Timeout
config.ClientID = "metricbeat"

client, err := sarama.NewClient([]string{m.Host()}, config)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error check missing?

Why just one host. Often one configures a set of 'bootstrap' endpoints to query meta-data. If one fails, ask next endpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error check added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having one host per metricset request is the usual pattern in Metricbeat. For simplicity reason I would not change this now for Kafka.

But this brings up an interesting question: Assuming each Kafka host is monitored with a list of Hosts, currently lots of data will be duplicated in elasticsearch as not only data is fetched from the partitions on the host the client connects to.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In kafka (in comparison to other services), you do not monitor one host, but you monitor one cluster. That is, when configuring the hosts, all hosts MUST belong to the same cluster. Having multiple bootstraping hosts is for redundancy, to decrease the chances of errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first version we should recommend to only use one Host as part of the docs but think long term on how we handle these kind of situations in metricbeat as this will not only apply to Kafka.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... is it required by module-interface to have a method doing Host() string? Why not have the module/metricset decide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the issue is only Kafka specific, we could also have a bootstrap_hosts: [...] config option for the Kafka module. Lets handle this as soon as it pops up as an issue.

for _, topic := range topics {
partitions, err := client.Partitions(topic)
if err != nil {
logp.Err("Fetch partition info for topic %s: %s", topic, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance partitions not being empty in err? We still want to process potentially incomplete data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the Sarama code, it looks like if an error is returned, partitions is always nil

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still wonder if we want to return an error here. By returning an error, an error event will be published, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return an error event. But the problem is that it will abort the execution of the event fetch for all coming topics.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we might consider adding an error event to events here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean that we have an "error" entry at the wrong namespace of the event. Will open a separate issue to discuss this improvement for Metricbeat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the related issue: #3004

replicas, err := client.Replicas(topic, partition)
if err != nil {
logp.Err("Fetch replicas for partition %s in topic %s: %s", partition, topic, err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure if we should provide the replicas ids here or not.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say yes, as these are part of the cluster state.

@ruflin
Copy link
Member Author

ruflin commented Nov 11, 2016

@urso New version pushed.

@ruflin ruflin force-pushed the kafka-module branch 3 times, most recently from ea68d8f to 9d667c0 Compare November 14, 2016 17:21
@ruflin
Copy link
Member Author

ruflin commented Nov 14, 2016

jenkins, retest it

"id": brocker.ID(),
"address": brocker.Addr(),
},
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I was in hope for getting the offset per replica and create an event per replicate. The broker is the current leader to a partition. All other brokers handling the same partition will be the replica. Every now and then the leader is re-elected, turning the broker into an replica and another former replica into the leader.

Some event like:

common.MapStr {
  "topic": topic,
  "partition": partition,
  "broker": common.MapStr {
    "id": id,
    "address": brokers[id],
  }
  "offset": offset,
  "leader": id == leaderID,
}

The address of every broker and broker id is included in meta-data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up github issue: #3005

First module added is partition which contains stats about each existing partition.
@urso urso merged commit 54ad3d2 into elastic:master Nov 16, 2016
@monicasarbu monicasarbu deleted the kafka-module branch December 5, 2016 10:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants