Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use schemas in Postgres #1866

Merged
merged 1 commit into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ object Databases extends Logging {
readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
logger.info(s"granting read-only access to user=$readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil
schemas.foreach { schema =>
logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema")
statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA $schema TO $readOnlyUser")
}
}
}
}
Expand Down
76 changes: 47 additions & 29 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 6
val CURRENT_VERSION = 7

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

Expand All @@ -62,32 +62,50 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("ALTER TABLE channel_errors ALTER COLUMN timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + timestamp * interval '1 millisecond'")
}

def migration67(statement: Statement): Unit = {
statement.executeUpdate("CREATE SCHEMA audit")
statement.executeUpdate("ALTER TABLE sent SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE received SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE relayed SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE relayed_trampoline SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE network_fees SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE channel_events SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE channel_errors SET SCHEMA audit")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE SCHEMA audit")

statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON channel_errors(timestamp)")
statement.executeUpdate("CREATE TABLE audit.sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")

statement.executeUpdate("CREATE TABLE audit.channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON audit.sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON audit.received(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON audit.relayed(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON audit.relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON audit.relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON audit.relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON audit.network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON audit.channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON audit.channel_errors(timestamp)")
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
migration67(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
migration67(statement)
case Some(v@6) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration67(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand All @@ -97,7 +115,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
statement.setString(2, e.remoteNodeId.value.toHex)
statement.setLong(3, e.capacity.toLong)
Expand All @@ -112,7 +130,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
statement.setLong(1, p.amount.toLong)
statement.setLong(2, p.feesPaid.toLong)
Expand All @@ -133,7 +151,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.received VALUES (?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
statement.setLong(1, p.amount.toLong)
statement.setString(2, e.paymentHash.toHex)
Expand All @@ -153,7 +171,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
using(pg.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setString(3, nextTrampolineNodeId.value.toHex)
Expand All @@ -164,7 +182,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
}
for (p <- payments) {
using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, p.amount.toLong)
statement.setString(3, p.channelId.toHex)
Expand All @@ -179,7 +197,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
statement.setString(2, e.remoteNodeId.value.toHex)
statement.setString(3, e.tx.txid.toHex)
Expand All @@ -193,7 +211,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
val (errorName, errorMessage) = e.error match {
case LocalError(t) => (t.getClass.getSimpleName, t.getMessage)
case RemoteError(error) => ("remote", error.toAscii)
Expand All @@ -211,7 +229,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listSent(from: Long, to: Long): Seq[PaymentSent] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand Down Expand Up @@ -241,7 +259,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listReceived(from: Long, to: Long): Seq[PaymentReceived] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM received WHERE timestamp BETWEEN ? AND ?")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.received WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand All @@ -262,7 +280,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
inTransaction { pg =>
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand All @@ -273,7 +291,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
trampolineByHash + (paymentHash -> (amount, nodeId))
}
}
val relayedByHash = using(pg.prepareStatement("SELECT * FROM relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
val relayedByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand Down Expand Up @@ -308,7 +326,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map { rs =>
Expand Down
Loading