Skip to content

Commit

Permalink
History-preserving: Deleting objects creates a brand new transaction …
Browse files Browse the repository at this point in the history
…with a whiteout row for each deleted object.

Fixes #484.
  • Loading branch information
jamadden committed Jun 28, 2023
1 parent 24a01ca commit a8829ae
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 106 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
replaces the ``path`` query parameter.
- Remove the (local) runtime (install) dependency on
``setuptools`` / ``pkg_resources``. This was undeclared.
- History-preserving storage: Make deleting an object create a new
transaction with the new state set to NULL. This leaves the previous
revision of the object accessible. Previously, the most recent
revision of the object became unavailable. See :pr:`484`, with
thanks to Kirill Smelkov.

3.5.0 (2022-09-16)
==================
Expand Down
4 changes: 3 additions & 1 deletion src/relstorage/adapters/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ def __init__(self):
# raise ProgrammingError for other things, such as failing to get a lock.
self.illegal_operation_exceptions = (mod.ProgrammingError,)
self.use_replica_exceptions = (mod.OperationalError,)
self.Binary = mod.Binary
# Binary must be able to handle None values to produce a None
# parameter; not all of them can do this out of the box.
self.Binary = lambda d, _Binary=mod.Binary: _Binary(d) if d is not None else None
self._connect = mod.connect
self.priority = self.PRIORITY if not PYPY else self.PRIORITY_PYPY

Expand Down
5 changes: 3 additions & 2 deletions src/relstorage/adapters/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,9 @@ def deleteObject(cursor, oid_int, tid_int):
is at *tid_int*), leading all access to *oid_int* in the
future to throw ``POSKeyError``.
In history preserving databases, this means to set the state for the object
at the transaction to NULL, signifying that it's been deleted. A subsequent
In history preserving databases, this means that a new
revision of the object with a NULL state is created when the transaction is committed.
The NULL state signifies that it's been deleted. A subsequent
pack operation is required to actually remove these deleted items.
"""

Expand Down
4 changes: 4 additions & 0 deletions src/relstorage/adapters/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ def store_temps(self, cursor, state_oid_tid_iter):
Uses the cursor's ``executemany`` method to store temporary
objects.
:param state_oid_tid_iter: An iterable over
tuples ``(state, oid_int, tid_int)``. Data may be None
to indicate we should store a NULL.
If there is a more optimal way to implement putting objects in
the database, please do so.
Expand Down
37 changes: 21 additions & 16 deletions src/relstorage/adapters/packundo.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,24 @@ def batch_done_callback(total_count):
)
assert num_rows_sent_to_db == marker.reachable_count

__check_refs_script = """
SELECT zoid, to_zoid
FROM pack_object
INNER JOIN object_ref USING (zoid)
WHERE keep = %(TRUE)s
AND NOT EXISTS (
SELECT 1
FROM object_state
WHERE object_state.zoid = to_zoid
AND object_state.state IS NOT NULL
AND object_state.tid = (
SELECT MAX(tid)
FROM object_state
WHERE object_state.zoid = object_ref.to_zoid
)
)
"""

def check_refs(self, pack_tid):
"""
Are there any objects we're *not* going to garbage collect that
Expand All @@ -303,18 +321,7 @@ def check_refs(self, pack_tid):
try:
with _Progress('execute') as progress:
with self._make_ss_load_cursor(load_connection) as ss_load_cursor:
stmt = """
SELECT zoid, to_zoid
FROM pack_object
INNER JOIN object_ref USING (zoid)
WHERE keep = %(TRUE)s
AND NOT EXISTS (
SELECT 1
FROM object_state
WHERE object_state.zoid = to_zoid
AND object_state.state IS NOT NULL
)
"""
stmt = self.__check_refs_script
self.runner.run_script_stmt(ss_load_cursor, stmt)
progress.mark('download')

Expand Down Expand Up @@ -476,10 +483,8 @@ class HistoryPreservingPackUndo(PackUndo):
)

_script_delete_object = """
UPDATE object_state
SET state = NULL,
state_size = 0,
md5 = ''
SELECT 1
FROM object_state
WHERE zoid = %(oid)s
AND tid = %(tid)s
"""
Expand Down
26 changes: 21 additions & 5 deletions src/relstorage/adapters/postgresql/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def __init__(self, table, state_oid_tid_iterable, digester):
self.state_oid_tid_iterable = state_oid_tid_iterable
self._iter = iter(state_oid_tid_iterable)
self._digester = digester
if digester and bytes is not str:
if digester:
# On Python 3, this outputs a str, but our protocol needs bytes
self._digester = lambda s: digester(s).encode("ascii")
if self._digester:
Expand Down Expand Up @@ -337,13 +337,29 @@ def readinto(self, buf):
def __len__(self):
return len(self.state_oid_tid_iterable)

def _next_row(self): # pylint:disable=method-hidden
return next(self._iter)

def _read_one_tuple_md5(self,
buf,
_pack_into=WITH_SUM.pack_into,
_header_size=WITH_SUM.size,
_blank_header=bytearray(WITH_SUM.size)):

data, oid_int, tid_int = next(self._iter)
data, oid_int, tid_int = self._next_row()
if data is None:
# A deleted object. These should be quite rare relative
# to everything else. This won't have an MD5, so
# it can take the same code as the normal no-md5-path,
# as long as we arrange for it to get the right data.
# (A push-back iterator would simplify this.)
self._next_row = lambda: (data, oid_int, tid_int)
try:
self._read_one_tuple_no_md5(buf)
finally:
del self._next_row
return

len_data = len(data)
md5 = self._digester(data)
offset = len(buf)
Expand All @@ -363,8 +379,8 @@ def _read_one_tuple_no_md5(self,
_pack_into=NO_SUM.pack_into,
_header_size=NO_SUM.size,
_blank_header=bytearray(NO_SUM.size)):
data, oid_int, tid_int = next(self._iter)
len_data = len(data)
data, oid_int, tid_int = self._next_row()
len_data = len(data) if data is not None else -1
offset = len(buf)
buf.extend(_blank_header)
_pack_into(
Expand All @@ -375,4 +391,4 @@ def _read_one_tuple_no_md5(self,
-1,
len_data
)
buf.extend(data)
buf.extend(data or b'')
2 changes: 2 additions & 0 deletions src/relstorage/cache/memcache_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def set_all_for_tid(self, tid_int, state_oid_iter):
send_size = 0
to_send = {}
for state, oid_int, _ in state_oid_iter:
if not state:
continue
length = len(state)
cachekey = (oid_int, tid_int)
item_size = length + len(cachekey)
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/cache/tests/test_cache_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from relstorage.cache.storage_cache import StorageCache

from relstorage.cache.tests import MockOptions, MockAdapter
from relstorage.storage.tpc.temporary_storage import TemporaryStorage

from ...storage.tpc.temporary_storage import HFTPCTemporaryStorage as TemporaryStorage

def _build_history():
random = random2.Random(42)
Expand Down
4 changes: 2 additions & 2 deletions src/relstorage/cache/tests/test_storage_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from nti.testing.matchers import validly_provides

from relstorage.tests import TestCase
from relstorage.storage.tpc.temporary_storage import TemporaryStorage
from ...storage.tpc.temporary_storage import HFTPCTemporaryStorage as TemporaryStorage

from ..interfaces import IStorageCache
from . import MockOptionsWithFakeMemcache as MockOptionsWithFakeCache
Expand Down Expand Up @@ -248,7 +248,7 @@ def test_store_temp(self):
self.assertEqual(dict(temp_storage.stored_oids),
{1: (3, 6, 0), 2: (6, 9, 0)})
self.assertEqual(temp_storage.max_stored_oid, 2)
f = temp_storage._queue
f = temp_storage._buffer
f.seek(0)
self.assertEqual(f.read(), b'abcdefghi')
c.after_tpc_finish(p64(3), temp_storage)
Expand Down
10 changes: 8 additions & 2 deletions src/relstorage/storage/tpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
from ..._util import Lazy as BaseLazy
from ..._util import get_boolean_from_environ

from .temporary_storage import TemporaryStorage
from .temporary_storage import HPTPCTemporaryStorage
from .temporary_storage import HFTPCTemporaryStorage

logger = logging.getLogger(__name__)

Expand All @@ -66,6 +67,10 @@
# pylint:disable=protected-access

class _LazyResource(BaseLazy):
"""
A Lazy property that supports resource management of the
returned value.
"""

# If not None, a callable ``(storage, resource, force)``
# that aborts the *resource*, possibly forcefully (*force*).
Expand Down Expand Up @@ -209,7 +214,8 @@ def adapter(self):

@_LazyResource
def temp_storage(self):
return TemporaryStorage()
factory = HPTPCTemporaryStorage if self._storage.keep_history else HFTPCTemporaryStorage
return factory()

@temp_storage.cleaner
def temp_storage(self, _storage, temp_storage, _force=None):
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/storage/tpc/begin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _invalidated_oids(self, *oid_bytes):

def deleteObject(self, oid, oldserial, transaction):
"""
This method operates directly against the ``object_state`` table;
In history-free mode, this method operates directly against the ``object_state`` table;
as such, it immediately takes out locks on that table.
This method is only expected to be called when performing
Expand Down Expand Up @@ -214,6 +214,7 @@ def deleteObject(self, oid, oldserial, transaction):
tid_int = bytes8_to_int64(oldserial)
self.shared_state.cache.remove_cached_data(oid_int, tid_int)

self.shared_state.temp_storage.delete_object(oid_int, tid_int)
# We delegate the actual operation to the adapter's packundo,
# just like native pack
cursor = self.shared_state.store_connection.cursor
Expand Down
Loading

0 comments on commit a8829ae

Please sign in to comment.