Skip to content

Commit

Permalink
chore: don't use a seperate schema for views_and_triggers (#9392)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasBlaskey authored May 20, 2024
1 parent 893f7f5 commit 5480c57
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 99 deletions.
65 changes: 36 additions & 29 deletions master/internal/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ func ensureMigrationUpgrade(tx *pg.Tx) error {
return nil
}

func (db *PgDB) readDBCodeAndCheckIfDifferent(dbCodeDir string) (map[string]string, bool, error) {
files, err := os.ReadDir(dbCodeDir)
func (db *PgDB) readDBCodeAndCheckIfDifferent(
dbCodeDir string,
) (dbCodeFiles map[string]string, hash string, needToUpdateDBCode bool, err error) {
upDir := filepath.Join(dbCodeDir, "up")
files, err := os.ReadDir(upDir)
if err != nil {
return nil, false, fmt.Errorf("reading '%s' directory for database views: %w", dbCodeDir, err)
return nil, "", false, fmt.Errorf("reading '%s' directory for database views: %w", dbCodeDir, err)
}

allCode := ""
Expand All @@ -129,55 +132,61 @@ func (db *PgDB) readDBCodeAndCheckIfDifferent(dbCodeDir string) (map[string]stri
continue
}

filePath := filepath.Join(dbCodeDir, f.Name())
filePath := filepath.Join(upDir, f.Name())
b, err := os.ReadFile(filePath) //nolint: gosec // We trust dbCodeDir.
if err != nil {
return nil, false, fmt.Errorf("reading view definition file '%s': %w", filePath, err)
return nil, "", false, fmt.Errorf("reading view definition file '%s': %w", filePath, err)
}

fileNamesToSQL[f.Name()] = string(b)
allCode += string(b)
}

// I didn't want to get into deciding when to apply database or code or not but integration
// tests make it really hard to not do this.
hash := sha256.Sum256([]byte(allCode))
ourHash := hex.EncodeToString(hash[:])
hashSHA := sha256.Sum256([]byte(allCode))
ourHash := hex.EncodeToString(hashSHA[:])

// Check if the views_and_triggers_hash table exists. If it doesn't return that we need to create db code.
var tableExists bool
if err = db.sql.QueryRow(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'views_and_triggers_hash')").
Scan(&tableExists); err != nil {
return nil, false, fmt.Errorf("checking views_and_triggers_hash exists: %w", err)
return nil, "", false, fmt.Errorf("checking views_and_triggers_hash exists: %w", err)
}
if !tableExists {
return fileNamesToSQL, true, nil
return fileNamesToSQL, ourHash, true, nil
}

// Check if our hashes match. If they do we can just return we don't need to do anything.
var databaseHash string
if err := db.sql.QueryRow("SELECT hash FROM views_and_triggers_hash").Scan(&databaseHash); err != nil {
return nil, false, fmt.Errorf("getting hash from views_and_triggers_hash: %w", err)
return nil, "", false, fmt.Errorf("getting hash from views_and_triggers_hash: %w", err)
}
if databaseHash == ourHash {
return fileNamesToSQL, false, nil
return fileNamesToSQL, ourHash, false, nil
}

// Update our hash and return we need to create views and triggers.
if _, err := db.sql.Exec("UPDATE views_and_triggers_hash SET hash = $1", ourHash); err != nil {
return nil, false, fmt.Errorf("updating our database hash: %w", err)
if err := db.dropDBCode(dbCodeDir); err != nil {
return nil, "", false, err
}
return fileNamesToSQL, false, nil

return fileNamesToSQL, ourHash, true, nil
}

func (db *PgDB) addDBCode(fileNamesToSQL map[string]string) error {
func (db *PgDB) addDBCode(fileNamesToSQL map[string]string, hash string) error {
if err := db.withTransaction("determined database views", func(tx *sqlx.Tx) error {
for filePath, sql := range fileNamesToSQL {
if _, err := tx.Exec(sql); err != nil {
return fmt.Errorf("running database view file '%s': %w", filePath, err)
}
}

if _, err := tx.Exec("UPDATE views_and_triggers_hash SET hash = $1", hash); err != nil {
return fmt.Errorf("updating our database hash: %w", err)
}

return nil
}); err != nil {
return fmt.Errorf("adding determined database views: %w", err)
Expand All @@ -186,10 +195,13 @@ func (db *PgDB) addDBCode(fileNamesToSQL map[string]string) error {
return nil
}

func (db *PgDB) dropDBCode() error {
if _, err := db.sql.Exec(`
DROP SCHEMA IF EXISTS determined_code CASCADE;
CREATE SCHEMA determined_code;`); err != nil {
func (db *PgDB) dropDBCode(dbCodeDir string) error {
b, err := os.ReadFile(filepath.Join(dbCodeDir, "down.sql")) //nolint: gosec // We trust dbCodeDir.
if err != nil {
return fmt.Errorf("reading down db code migration: %w", err)
}

if _, err := db.sql.Exec(string(b)); err != nil {
return fmt.Errorf("removing determined database views so they can be created later: %w", err)
}

Expand All @@ -212,18 +224,10 @@ func (db *PgDB) Migrate(
defer cleanup()
}

dbCodeFiles, needToUpdateDBCode, err := db.readDBCodeAndCheckIfDifferent(dbCodeDir)
dbCodeFiles, hash, needToUpdateDBCode, err := db.readDBCodeAndCheckIfDifferent(dbCodeDir)
if err != nil {
return false, err
}
if needToUpdateDBCode {
if err := db.dropDBCode(); err != nil {
return false, err
}
log.Info("database views changed")
} else {
log.Info("database views unchanged, will not updated")
}

// go-pg/migrations uses go-pg/pg connection API, which is not compatible
// with pgx, so we use a one-off go-pg/pg connection.
Expand Down Expand Up @@ -289,9 +293,12 @@ func (db *PgDB) Migrate(

if newVersion >= 20240502203516 { // Only comes up in testing old data.
if needToUpdateDBCode {
if err := db.addDBCode(dbCodeFiles); err != nil {
log.Info("database views changed")
if err := db.addDBCode(dbCodeFiles, hash); err != nil {
return false, err
}
} else {
log.Info("database views unchanged, will not updated")
}
}

Expand Down
36 changes: 0 additions & 36 deletions master/internal/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,48 +175,12 @@ type PgDB struct {
URL string
}

func setSearchPath(sql *sqlx.DB) error {
// In integration tests, multiple processes can be running this code at once, which can lead to
// errors because PostgreSQL ALTER DATABASE can do weird things.
if testOnlyDBLock != nil {
cleanup := testOnlyDBLock(sql)
defer cleanup()
}

if _, err := sql.Exec(`DO $$
BEGIN
execute 'ALTER DATABASE "'||current_database()||'" SET SEARCH_PATH TO public,determined_code';
END
$$;`); err != nil {
return fmt.Errorf("setting search path on db connection: %w", err)
}

return nil
}

// ConnectPostgres connects to a Postgres database.
func ConnectPostgres(url string) (*PgDB, error) {
return connectPostgres(url, true)
}

func connectPostgres(url string, firstTime bool) (*PgDB, error) {
numTries := 0
for {
sql, err := sqlx.Connect("pgx", url)
if err == nil {
if firstTime {
// On first connection set the search path, close the connection and reconnect.
// There is a little bit of a chicken and egg problem with setting search path.
// We need to actually reconnect for this to take affect.
if err := setSearchPath(sql); err != nil {
return nil, err
}
if err := sql.Close(); err != nil {
return nil, err
}
return connectPostgres(url, false)
}

db := &PgDB{sql: sql, queries: &StaticQueryMap{}, URL: url}
initTheOneBun(db)
return db, nil
Expand Down
7 changes: 3 additions & 4 deletions master/static/views_and_triggers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ If you need to change views / other db code, just change them and the changes wi

If you need to delete views / other db code, just delete them from the file.

If you need to add a view, just make sure you add it into the ```determined_code``` schema (aka do ```CREATE VIEW determined_code.test ...```). Exception are triggers inherit from the table, therefore just create them without the schema name. The procedure being executed should still be in the ```determined_code``` so that way when the ```determined_code``` schema get's dropped the trigger will get cascaded.
If you need to add a view add it in a ```.sql``` file in the ```up``` folder and add a delete statement in the ``down.sql`` in this directory.

### How does dbviews_and_triggers work?

On everytime the Determined master starts up
On everytime the Determined master starts up we check if the database views have changed.

- The Postgres schema ```determined_code``` will be dropped if it exists.
- The ``down.sql`` file runs.
- Migrations run as they did before ```determined_code``` was added.
- The Postgres schema ```determined_code``` will be recreated.
- All SQL files in the ``views_and_triggers`` will be run in lexicographical order.

### Limitations
Expand Down
34 changes: 34 additions & 0 deletions master/static/views_and_triggers/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
DROP VIEW IF EXISTS proto_checkpoints_view;
DROP VIEW IF EXISTS checkpoints_view;

DROP VIEW IF EXISTS trials;

DROP VIEW IF EXISTS steps;
DROP VIEW IF EXISTS validations;
DROP VIEW IF EXISTS validation_metrics;

DROP FUNCTION IF EXISTS abort_checkpoint_delete CASCADE;
DROP FUNCTION IF EXISTS autoupdate_exp_best_trial_metrics CASCADE;
DROP FUNCTION IF EXISTS autoupdate_exp_best_trial_metrics_on_delete CASCADE;
DROP FUNCTION IF EXISTS autoupdate_user_image_deleted CASCADE;
DROP FUNCTION IF EXISTS autoupdate_user_image_modified CASCADE;
DROP FUNCTION IF EXISTS get_raw_metric CASCADE;
DROP FUNCTION IF EXISTS get_signed_metric CASCADE;
DROP FUNCTION IF EXISTS page_info CASCADE;
DROP FUNCTION IF EXISTS proto_time CASCADE;
DROP FUNCTION IF EXISTS retention_timestamp CASCADE;
DROP FUNCTION IF EXISTS set_modified_time CASCADE;
DROP FUNCTION IF EXISTS stream_model_change CASCADE;
DROP FUNCTION IF EXISTS stream_model_notify CASCADE;
DROP FUNCTION IF EXISTS stream_model_seq_modify CASCADE;
DROP FUNCTION IF EXISTS stream_model_version_change CASCADE;
DROP FUNCTION IF EXISTS stream_model_version_change_by_model CASCADE;
DROP FUNCTION IF EXISTS stream_model_version_notify CASCADE;
DROP FUNCTION IF EXISTS stream_model_version_seq_modify CASCADE;
DROP FUNCTION IF EXISTS stream_model_version_seq_modify_by_model CASCADE;
DROP FUNCTION IF EXISTS stream_project_change CASCADE;
DROP FUNCTION IF EXISTS stream_project_notify CASCADE;
DROP FUNCTION IF EXISTS stream_project_seq_modify CASCADE;
DROP FUNCTION IF EXISTS try_float8_cast CASCADE;

DROP AGGREGATE IF EXISTS jsonb_collect(jsonb);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE VIEW determined_code.checkpoints_view AS
CREATE VIEW checkpoints_view AS
SELECT c.id,
c.uuid,
c.task_id,
Expand All @@ -24,7 +24,7 @@ SELECT c.id,
LEFT JOIN raw_validations v ON ((c.metadata ->> 'steps_completed'::text)::integer) = v.total_batches AND r.id = v.trial_id AND NOT v.archived
LEFT JOIN raw_steps s ON ((c.metadata ->> 'steps_completed'::text)::integer) = s.total_batches AND r.id = s.trial_id AND NOT s.archived;

CREATE VIEW determined_code.proto_checkpoints_view AS
CREATE VIEW proto_checkpoints_view AS
SELECT c.uuid,
c.task_id,
c.allocation_id,
Expand All @@ -36,7 +36,7 @@ SELECT c.uuid,
jsonb_build_object('trial_id', c.trial_id, 'experiment_id', c.experiment_id, 'experiment_config', c.experiment_config, 'hparams', c.hparams, 'training_metrics', jsonb_build_object('avg_metrics', c.training_metrics -> 'avg_metrics'::text, 'batch_metrics', c.training_metrics -> 'batch_metrics'::text), 'validation_metrics', json_build_object('avg_metrics', c.validation_metrics), 'searcher_metric', c.searcher_metric) AS training
FROM checkpoints_view c;

CREATE FUNCTION determined_code.abort_checkpoint_delete() RETURNS trigger
CREATE FUNCTION abort_checkpoint_delete() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE VIEW determined_code.trials AS
CREATE VIEW trials AS
SELECT t.run_id AS id,
r.summary_metrics,
r.summary_metrics_timestamp,
Expand Down Expand Up @@ -27,7 +27,7 @@ CREATE VIEW determined_code.trials AS
FROM trials_v2 t
JOIN runs r ON t.run_id = r.id;

CREATE VIEW determined_code.steps AS
CREATE VIEW steps AS
SELECT raw_steps.trial_id,
raw_steps.end_time,
raw_steps.metrics,
Expand All @@ -38,7 +38,7 @@ CREATE VIEW determined_code.steps AS
FROM raw_steps
WHERE NOT raw_steps.archived;

CREATE VIEW determined_code.validations AS
CREATE VIEW validations AS
SELECT raw_validations.id,
raw_validations.trial_id,
raw_validations.end_time,
Expand All @@ -50,13 +50,13 @@ CREATE VIEW determined_code.validations AS
WHERE NOT raw_validations.archived;


CREATE FUNCTION determined_code.get_raw_metric(v raw_validations, e experiments) RETURNS double precision
CREATE FUNCTION get_raw_metric(v raw_validations, e experiments) RETURNS double precision
LANGUAGE sql STABLE
AS $$
SELECT (v.metrics->'validation_metrics'->>(e.config->'searcher'->>'metric'))::float8
$$;

CREATE FUNCTION determined_code.get_signed_metric(v raw_validations, e experiments) RETURNS double precision
CREATE FUNCTION get_signed_metric(v raw_validations, e experiments) RETURNS double precision
LANGUAGE sql STABLE
AS $$
SELECT get_raw_metric(v, e) * (
Expand All @@ -68,7 +68,7 @@ CREATE FUNCTION determined_code.get_signed_metric(v raw_validations, e experimen
END)
$$;

CREATE VIEW determined_code.validation_metrics AS
CREATE VIEW validation_metrics AS
SELECT v.id,
get_raw_metric(v.*, e.*) AS raw,
get_signed_metric(v.*, e.*) AS signed
Expand All @@ -77,7 +77,7 @@ CREATE VIEW determined_code.validation_metrics AS
raw_validations v
WHERE e.id = t.experiment_id AND t.id = v.trial_id;

CREATE FUNCTION determined_code.autoupdate_exp_best_trial_metrics() RETURNS trigger
CREATE FUNCTION autoupdate_exp_best_trial_metrics() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
Expand All @@ -94,7 +94,7 @@ $$;
CREATE TRIGGER autoupdate_exp_best_trial_metrics AFTER UPDATE OF best_validation_id ON runs FOR EACH ROW EXECUTE PROCEDURE autoupdate_exp_best_trial_metrics();


CREATE FUNCTION determined_code.autoupdate_exp_best_trial_metrics_on_delete() RETURNS trigger
CREATE FUNCTION autoupdate_exp_best_trial_metrics_on_delete() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
Expand Down
Loading

0 comments on commit 5480c57

Please sign in to comment.