Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

find disk damage, report immediately #183

Merged
merged 15 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ map<TTaskType::type, map<string, uint32_t>> TaskWorkerPool::_s_running_task_user
map<TTaskType::type, map<string, uint32_t>> TaskWorkerPool::_s_total_task_user_count;
map<TTaskType::type, uint32_t> TaskWorkerPool::_s_total_task_count;
FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;
boost::mutex TaskWorkerPool::_disk_broken_lock;
boost::posix_time::time_duration TaskWorkerPool::_wait_duration;

TaskWorkerPool::TaskWorkerPool(
const TaskWorkerType task_worker_type,
Expand Down Expand Up @@ -146,10 +148,12 @@ void TaskWorkerPool::start() {
_callback_function = _report_task_worker_thread_callback;
break;
case TaskWorkerType::REPORT_DISK_STATE:
_wait_duration = boost::posix_time::time_duration(0, 0, config::report_disk_state_interval_seconds, 0);
_worker_count = REPORT_DISK_STATE_WORKER_COUNT;
_callback_function = _report_disk_state_worker_thread_callback;
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
_wait_duration = boost::posix_time::time_duration(0, 0, config::report_disk_state_interval_seconds, 0);
_worker_count = REPORT_OLAP_TABLE_WORKER_COUNT;
_callback_function = _report_olap_table_worker_thread_callback;
break;
Expand Down Expand Up @@ -1525,15 +1529,6 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)

OLAPStatus get_all_root_path_stat =
worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat);
if (get_all_root_path_stat != OLAPStatus::OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get all root path stat.");
#ifndef BE_TEST
sleep(config::report_disk_state_interval_seconds);
continue;
#else
return (void*)0;
#endif
}

map<string, TDisk> disks;
for (auto root_path_state : root_paths_stat) {
Expand All @@ -1558,7 +1553,15 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}

#ifndef BE_TEST
sleep(config::report_disk_state_interval_seconds);
{
// wait disk_broken_cv awaken
// if awaken, set is_report_disk_state_already to true, it will not notify again
// if overtime, while will go to next cycle
boost::unique_lock<boost::mutex> lk(_disk_broken_lock);
if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) {
OLAPRootPath::get_instance()->is_report_disk_state_already = true;
}
}
}
#endif

Expand Down Expand Up @@ -1588,7 +1591,13 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
OLAP_LOG_WARNING("report get all tablets info failed. status: %d",
report_all_tablets_info_status);
#ifndef BE_TEST
sleep(config::report_olap_table_interval_seconds);
// wait disk_broken_cv awaken
// if awaken, set is_report_olap_table_already to true, it will not notify again
// if overtime, while will go to next cycle
boost::unique_lock<boost::mutex> lk(_disk_broken_lock);
if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) {
OLAPRootPath::get_instance()->is_report_olap_table_already = true;
}
continue;
#else
return (void*)0;
Expand All @@ -1606,7 +1615,13 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
}

#ifndef BE_TEST
sleep(config::report_olap_table_interval_seconds);
// wait disk_broken_cv awaken
// if awaken, set is_report_olap_table_already to true, it will not notify again
// if overtime, while will go to next cycle
boost::unique_lock<boost::mutex> lk(_disk_broken_lock);
if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) {
OLAPRootPath::get_instance()->is_report_olap_table_already = true;
}
}
#endif

Expand Down
7 changes: 7 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#define BDG_PALO_BE_SRC_TASK_WORKER_POOL_H

#include <atomic>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <deque>
#include <utility>
#include <vector>
Expand All @@ -27,6 +30,7 @@
#include "gen_cpp/HeartbeatService_types.h"
#include "olap/command_executor.h"
#include "olap/olap_define.h"
#include "olap/olap_rootpath.h"
#include "olap/utils.h"

namespace palo {
Expand Down Expand Up @@ -147,6 +151,9 @@ class TaskWorkerPool {
static MutexLock _s_running_task_user_count_lock;
static FrontendServiceClientCache _master_service_client_cache;

static boost::mutex _disk_broken_lock;
static boost::posix_time::time_duration _wait_duration;

DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool
} // namespace palo
Expand Down
26 changes: 25 additions & 1 deletion be/src/olap/olap_rootpath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ OLAPRootPath::OLAPRootPath() :
_total_storage_medium_type_count(0),
_available_storage_medium_type_count(0),
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true) {}
_is_all_cluster_id_exist(true),
_is_drop_tables(false),
is_report_disk_state_already(false),
is_report_olap_table_already(false) {}

OLAPRootPath::~OLAPRootPath() {
clear();
Expand Down Expand Up @@ -435,8 +438,25 @@ void OLAPRootPath::start_disk_stat_monitor() {
_start_check_disks();
_detect_unused_flag();
_delete_tables_on_unused_root_path();

// if drop tables
// notify disk_state_worker_thread and olap_table_worker_thread until they received
if (_is_drop_tables) {
disk_broken_cv.notify_all();

bool is_report_disk_state_expected = true;
bool is_report_olap_table_expected = true;
bool is_report_disk_state_exchanged =
is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false);
bool is_report_olap_table_exchanged =
is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false);
if (is_report_disk_state_exchanged && is_report_olap_table_exchanged) {
_is_drop_tables = false;
}
}
}


void OLAPRootPath::_start_check_disks() {
OLAPRootPath::RootPathVec all_available_root_path;
get_all_available_root_path(&all_available_root_path);
Expand Down Expand Up @@ -940,6 +960,10 @@ void OLAPRootPath::_delete_tables_on_unused_root_path() {
exit(0);
}

if (!table_info_vec.empty()) {
_is_drop_tables = true;
}

OLAPEngine::get_instance()->drop_tables_on_error_root_path(table_info_vec);
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/olap_rootpath.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#ifndef BDG_PALO_BE_SRC_OLAP_OLAP_ROOTPATH_H
#define BDG_PALO_BE_SRC_OLAP_OLAP_ROOTPATH_H

#include <atomic>
#include <boost/thread/condition_variable.hpp>
#include <list>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -130,6 +132,10 @@ class OLAPRootPath {
const std::vector<std::string>& root_path_vec,
const std::vector<bool>& is_accessable_vec);

boost::condition_variable disk_broken_cv;
std::atomic_bool is_report_disk_state_already;
std::atomic_bool is_report_olap_table_already;

private:
struct RootPathInfo {
RootPathInfo():
Expand Down Expand Up @@ -221,6 +227,7 @@ class OLAPRootPath {

int32_t _effective_cluster_id;
bool _is_all_cluster_id_exist;
bool _is_drop_tables;

// 错误磁盘所在百分比,超过设定的值,则engine需要退出运行
uint32_t _min_percentage_of_error_disk;
Expand Down