Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigtable-backup: Tool for creating and managing periodic tables stored in Bigtable #729

Merged
merged 3 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.PHONY: helm helm-install helm-upgrade helm-publish helm-debug helm-clean
.PHONY: docker-driver docker-driver-clean docker-driver-enable docker-driver-push
.PHONY: push-images push-latest save-images load-images promtail-image loki-image build-image
.PHONY: bigtable-backup, push-bigtable-backup
.PHONY: benchmark-store
#############
# Variables #
Expand Down Expand Up @@ -315,6 +316,19 @@ docker-driver-clean:
-docker plugin rm grafana/loki-docker-driver:$(IMAGE_TAG)
rm -rf cmd/docker-driver/rootfs

########################
# Bigtable Backup Tool #
########################

BIGTABLE_BACKUP_TOOL_FOLDER = ./tools/bigtable-backup
BIGTABLE_BACKUP_TOOL_TAG ?= $(IMAGE_TAG)

bigtable-backup:
docker build -t $(IMAGE_PREFIX)/$(shell basename $(BIGTABLE_BACKUP_TOOL_FOLDER)) $(BIGTABLE_BACKUP_TOOL_FOLDER)
docker tag $(IMAGE_PREFIX)/$(shell basename $(BIGTABLE_BACKUP_TOOL_FOLDER)) $(IMAGE_PREFIX)/loki-bigtable-backup:$(BIGTABLE_BACKUP_TOOL_TAG)

push-bigtable-backup: bigtable-backup
docker push $(IMAGE_PREFIX)/loki-bigtable-backup:$(BIGTABLE_BACKUP_TOOL_TAG)

##########
# Images #
Expand Down
7 changes: 7 additions & 0 deletions tools/bigtable-backup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM grafana/bigtable-backup:master-418c0dd
RUN apk add --update --no-cache python3 python3-dev \
&& pip3 install --no-cache-dir --upgrade pip
COPY bigtable-backup.py bigtable-backup.py
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
ENTRYPOINT ["usr/bin/python3", "bigtable-backup.py"]
203 changes: 203 additions & 0 deletions tools/bigtable-backup/bigtable-backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import argparse
import subprocess
import time
import json

from datetime import datetime, timedelta
import pytz
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

registry = CollectorRegistry()
bigtable_backup_job_last_run_seconds = Gauge('bigtable_backup_job_last_run_seconds', 'Last time a bigtable backup job ran at', registry=registry)
bigtable_backup_job_last_success_seconds = Gauge('bigtable_backup_job_last_success_seconds', 'Last time a bigtable backup job successfully finished', registry=registry)
bigtable_backup_job_runtime_seconds = Gauge('bigtable_backup_job_runtime_seconds', 'Runtime of last successfully finished bigtable backup job', registry=registry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add 2 more metrics: backups_created and backups_existing?

bigtable_backup_job_backups_created = Gauge('bigtable_backup_job_backups_created', 'Number of backups created during last run', registry=registry)

job_backup_active_periodic_table = "backup-active-periodic-table"
job_ensure_backups = "ensure-backups"

def secs_to_periodic_table_number(periodic_secs):
return time.time() / periodic_secs


def backup_active_periodic_table(args):
push_job_started_metric(args.prom_push_gateway_endpoint, job_backup_active_periodic_table)
start_time = time.time()

table_id = args.bigtable_table_id_prefix + str(int(time.time() / args.periodic_table_duration))
create_backup(table_id, args)

push_job_finished_metric(args.prom_push_gateway_endpoint, job_backup_active_periodic_table, int(time.time() - start_time))


def ensure_backups(args):
push_job_started_metric(args.prom_push_gateway_endpoint, job_ensure_backups)
start_time = time.time()

# ensure-backups job specific metrics
bigtable_backup_job_num_tables_backed_up = Gauge('bigtable_backup_job_num_tables_backed_up', 'Number of table backups found during last run', registry=registry)
bigtable_backup_job_num_backup_ups = Gauge('bigtable_backup_job_num_backup_ups', 'Sum of number of backups per table found during last run', registry=registry)

# Read all the existing backups
popen = subprocess.Popen(['bigtable-backup', 'list-backups', '-ojson', '--backup-path', args.destination_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()

# build and push metrics related to existing backups
backups = json.loads(popen.stdout.readline())
if (args.duration == None and args.period_from == None) or (args.duration != None and args.period_from != None):
raise ValueError("Either of --duration or --periodic-table-duration must be set")

bigtable_backup_job_num_tables_backed_up.set(len(backups))
for __, timestamps in backups.items():
bigtable_backup_job_num_backup_ups.inc(len(timestamps))

push_metrics(args.prom_push_gateway_endpoint, job_ensure_backups)

if args.period_from == None:
period_from = datetime.utcnow() - timedelta(days=args.duration)
args.period_from = valid_date(period_from.strftime("%Y-%m-%d"))
args.period_to = valid_date(datetime.utcnow().strftime("%Y-%m-%d"))

oldest_table_number = int(args.period_from.timestamp() / args.periodic_table_duration)
newest_table_number = int(args.period_to.timestamp() / args.periodic_table_duration)
active_table_number = time.time() / args.periodic_table_duration

print("Checking right backups exist")
while oldest_table_number <= newest_table_number:
table_id = args.bigtable_table_id_prefix + str(oldest_table_number)
oldest_table_number += 1
if table_id not in backups:
print("backup for {} not found".format(table_id))
create_backup(table_id, args)
bigtable_backup_job_backups_created.inc(1)

print("Checking whether all the backups are created after their period is over and deleting old unwanted backups")
for table_id, timestamps in backups.items():
table_number = int(table_id.rsplit("_", 1)[-1])
last_timestamp_from_table_number = find_last_timestamp_from_table_number(table_number,
args.periodic_table_duration)

# Checking whether backup is created after last timestamp of tables period.
if last_timestamp_from_table_number > timestamps[-1]:
create_backup(table_id, args)

# Retain only most recent backup for non active table
if table_number != active_table_number and len(timestamps) > 1:
for timestamp in timestamps[:-1]:
delete_backup(table_id, str(timestamp), args)

push_job_finished_metric(args.prom_push_gateway_endpoint, job_ensure_backups, int(time.time() - start_time))

def find_last_timestamp_from_table_number(table_number, periodic_secs):
return ((table_number + 1) * periodic_secs) - 1


def valid_date(s):
try:
dt = datetime.utcnow().strptime(s, "%Y-%m-%d")
utc = pytz.timezone('UTC')
return utc.localize(dt)
except ValueError:
msg = "Not a valid date: '{0}'.".format(s)
raise argparse.ArgumentTypeError(msg)


def valid_periodic_duration(s):
try:
return int(s) * 86400
except ValueError:
msg = "Not a valid duration: '{0}'.".format(s)
raise argparse.ArgumentTypeError(msg)


def valid_table_id_prefix(s):
if not str(s).endswith("_"):
return str(s) + "_"

def valid_int(s):
return int(s)


def create_backup(table_id, args):
popen = subprocess.Popen(['bigtable-backup', 'create', '--bigtable-table-id-prefix', table_id,
'--temp-prefix', args.temp_prefix, '--bigtable-project-id', args.bigtable_project_id,
'--bigtable-instance-id', args.bigtable_instance_id, '--destination-path',
args.destination_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()
if popen.returncode != 0:
raise Exception("Failed to create backup with error {}".format(b"".join(popen.stdout.readlines()).decode()))
else:
print("Backup created for table {}".format(table_id))


def delete_backup(table_id, timestamp, args):
popen = subprocess.Popen(['bigtable-backup', 'delete-backup', '--bigtable-table-id', table_id,
'--backup-path', args.destination_path, "--backup-timestamp", timestamp],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()
if popen.returncode != 0:
raise Exception("Failed to delete backup with error {}".format(b"".join(popen.stdout.readlines()).decode()))
else:
print(popen.stdout.readlines())

def push_job_started_metric(endpoint, job):
try:
bigtable_backup_job_last_run_seconds.set_to_current_time()
push_to_gateway(endpoint, job=job, registry=registry)
except Exception as e:
print("failed to push metrics with error {}".format(e))

def push_job_finished_metric(endpoint, job, runtime):
try:
bigtable_backup_job_last_success_seconds.set_to_current_time()
bigtable_backup_job_runtime_seconds.set(runtime)
push_to_gateway(endpoint, job=job, registry=registry)
except Exception as e:
print("failed to push metrics with error {}".format(e))

def push_metrics(endpoint, job):
try:
push_to_gateway(endpoint, job=job, registry=registry)
except Exception as e:
print("failed to push metrics with error {}".format(e))


def main():
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(help="commands")
parser.add_argument("--bigtable-project-id", required=True,
help="The ID of the GCP project of the Cloud Bigtable instance")
parser.add_argument("--bigtable-instance-id", required=True,
help="The ID of the Cloud Bigtable instance that contains the tables")
parser.add_argument("--bigtable-table-id-prefix", required=True, type=valid_table_id_prefix,
help="Prefix to build IDs of the tables using periodic-table-duration")
parser.add_argument("--destination-path", required=True,
help="GCS path where data should be written. For example, gs://mybucket/somefolder/")
parser.add_argument("--temp-prefix", required=True,
help="Path and filename prefix for writing temporary files. ex: gs://MyBucket/tmp")
parser.add_argument("--periodic-table-duration", required=True, type=valid_periodic_duration,
help="Periodic config set for loki tables in days")
parser.add_argument("--prom-push-gateway-endpoint", default="localhost:9091", help="Endpoint where metrics are to be pushed")

backup_active_periodic_table_parser = subparser.add_parser(job_backup_active_periodic_table,
help="Backup active periodic table")
backup_active_periodic_table_parser.set_defaults(func=backup_active_periodic_table)

ensure_backups_parser = subparser.add_parser(job_ensure_backups,
help="Ensure backups of right tables exist")
ensure_backups_parser.add_argument('--duration', help="Duration in previous consecutive days for which backups should exist. "
"Must not be set with --period-from and --period-to", type=valid_int)
ensure_backups_parser.add_argument('--period-from', type=valid_date, help="Backups should exist starting from the date. Must not be set with --duration")
ensure_backups_parser.add_argument('--period-to', type=valid_date,
default=datetime.utcnow().strftime("%Y-%m-%d"))
ensure_backups_parser.set_defaults(func=ensure_backups)

args = parser.parse_args()

args.func(args)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions tools/bigtable-backup/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
prometheus-client==0.7.1
pytz==2019.1