Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for Directed Reads #1000

Merged
merged 26 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2f70017
changes
asthamohta Feb 23, 2023
88c70ce
changes
asthamohta Feb 23, 2023
69c3f6e
docs
asthamohta Feb 23, 2023
aa9f18c
docs
asthamohta Feb 23, 2023
5a0eadc
linting
asthamohta Feb 23, 2023
6b2ca49
Merge branch 'main' into direct-read-main-code
harshachinta Sep 16, 2023
e6ae0de
Merge branch 'main' into direct-read-main-code
harshachinta Nov 19, 2023
3036bb6
feat(spanner): remove client side validations for directed read options
harshachinta Nov 20, 2023
0e68229
feat(spanner): update the auto_failover_disabled field
harshachinta Nov 20, 2023
dad4a2d
feat(spanner): update unit tests
harshachinta Nov 20, 2023
c18e0a3
feat(spanner): update test
harshachinta Nov 22, 2023
f06a7bc
Merge branch 'main' into direct-read-main-code
harshachinta Dec 8, 2023
76cbdac
feat(spanner): update documentation
harshachinta Dec 8, 2023
71d728d
feat(spanner): add system test to validate exception in case of RW tr…
harshachinta Dec 8, 2023
d349418
feat(spanner): update unit test
harshachinta Dec 8, 2023
34026ca
Merge branch 'main' into direct-read-main-code
harshachinta Dec 8, 2023
927da67
feat(spanner): add dro for batchsnapshot and update system tests
harshachinta Dec 9, 2023
2dfc406
feat(spanner): fix unit tests for batchsnapshot
harshachinta Dec 9, 2023
f9af1d8
feat(spanner): add unit tests for partition read and query
harshachinta Dec 9, 2023
54a1e16
Merge branch 'main' into direct-read-main-code
harshachinta Dec 15, 2023
366f2f8
Merge branch 'main' into direct-read-main-code
harshachinta Dec 27, 2023
426143a
Merge branch 'main' into direct-read-main-code
harshachinta Jan 7, 2024
12157b4
feat(spanner): lint fixes
harshachinta Jan 8, 2024
ce21ed8
feat(spanner): code refactor remove TransactionType
harshachinta Jan 8, 2024
37abf81
feat(spanner): comment refactor
harshachinta Jan 8, 2024
7162a42
feat(spanner): remove comments
harshachinta Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .types.spanner import CommitRequest
from .types.spanner import CreateSessionRequest
from .types.spanner import DeleteSessionRequest
from .types.spanner import DirectedReadOptions
from .types.spanner import ExecuteBatchDmlRequest
from .types.spanner import ExecuteBatchDmlResponse
from .types.spanner import ExecuteSqlRequest
Expand Down Expand Up @@ -108,6 +109,7 @@
"CommitResponse",
"CreateSessionRequest",
"DeleteSessionRequest",
"DirectedReadOptions",
"ExecuteBatchDmlRequest",
"ExecuteBatchDmlResponse",
"ExecuteSqlRequest",
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class Client(ClientWithProject):
disable leader aware routing. Disabling leader aware routing would
route all requests in RW/PDML transactions to the closest region.

:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.

:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -139,6 +145,7 @@ def __init__(
client_options=None,
query_options=None,
route_to_leader_enabled=True,
directed_read_options=None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -179,6 +186,7 @@ def __init__(
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options

@property
def credentials(self):
Expand Down Expand Up @@ -260,6 +268,17 @@ def route_to_leader_enabled(self):
"""
return self._route_to_leader_enabled

@property
def directed_read_options(self):
"""Getter for directed_read_options.

:rtype:
:class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:returns: The directed_read_options for the client.
"""
return self._directed_read_options

def copy(self):
"""Make a copy of this client.

Expand Down Expand Up @@ -383,3 +402,14 @@ def list_instances(self, filter_="", page_size=None):
request=request, metadata=metadata
)
return page_iter

@directed_read_options.setter
def directed_read_options(self, directed_read_options):
"""Sets directed_read_options for the client
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
"""
self._directed_read_options = directed_read_options
17 changes: 17 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def __init__(
self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled
self._enable_drop_protection = enable_drop_protection
self._reconciling = False
self._directed_read_options = self._instance._client.directed_read_options

if pool is None:
pool = BurstyPool(database_role=database_role)
Expand Down Expand Up @@ -1226,6 +1227,7 @@ def generate_read_batches(
partition_size_bytes=None,
max_partitions=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -1265,6 +1267,12 @@ def generate_read_batches(
(Optional) If this is for a partitioned read and this field is
set ``true``, the request will be executed via offline access.

:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for ReadRequests that indicates which replicas
or regions should be used for non-transactional reads.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

Expand Down Expand Up @@ -1293,6 +1301,7 @@ def generate_read_batches(
"keyset": keyset._to_dict(),
"index": index,
"data_boost_enabled": data_boost_enabled,
"directed_read_options": directed_read_options,
}
for partition in partitions:
yield {"partition": partition, "read": read_info.copy()}
Expand Down Expand Up @@ -1337,6 +1346,7 @@ def generate_query_batches(
max_partitions=None,
query_options=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -1388,6 +1398,12 @@ def generate_query_batches(
(Optional) If this is for a partitioned query and this field is
set ``true``, the request will be executed via offline access.

:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional queries.

:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.

Expand All @@ -1412,6 +1428,7 @@ def generate_query_batches(
query_info = {
"sql": sql,
"data_boost_enabled": data_boost_enabled,
"directed_read_options": directed_read_options,
}
if params:
query_info["params"] = params
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def read(
partition=None,
request_options=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -224,6 +225,12 @@ def read(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.

:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.

Expand Down Expand Up @@ -253,6 +260,11 @@ def read(
if self._read_only:
# Transaction tags are not supported for read only transactions.
request_options.transaction_tag = None
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag

Expand All @@ -266,6 +278,7 @@ def read(
partition_token=partition,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.streaming_read,
Expand Down Expand Up @@ -322,6 +335,7 @@ def execute_sql(
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
data_boost_enabled=False,
directed_read_options=None,
):
"""Perform an ``ExecuteStreamingSql`` API request.

Expand Down Expand Up @@ -379,6 +393,12 @@ def execute_sql(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.

:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.

:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.
Expand Down Expand Up @@ -419,6 +439,11 @@ def execute_sql(
if self._read_only:
# Transaction tags are not supported for read only transactions.
request_options.transaction_tag = None
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag

Expand All @@ -433,6 +458,7 @@ def execute_sql(
query_options=query_options,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.execute_streaming_sql,
Expand Down
76 changes: 76 additions & 0 deletions samples/samples/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from google.cloud import spanner
from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin
from google.cloud.spanner_v1 import param_types
from google.cloud.spanner_v1 import DirectedReadOptions
from google.type import expr_pb2
from google.iam.v1 import policy_pb2
from google.cloud.spanner_v1.data_types import JsonObject
Expand Down Expand Up @@ -2723,6 +2724,78 @@ def drop_sequence(instance_id, database_id):

# [END spanner_drop_sequence]


def directed_read_options(
instance_id,
database_id,
):
"""
Shows how to run an execute sql request with directed read options.
Only one of exclude_replicas or include_replicas can be set
Each accepts a list of replicaSelections which contains location and type
* `location` - The location must be one of the regions within the
multi-region configuration of your database.
* `type_` - The type of the replica
Some examples of using replica_selectors are:
* `location:us-east1` --> The "us-east1" replica(s) of any available type
will be used to process the request.
* `type:READ_ONLY` --> The "READ_ONLY" type replica(s) in nearest
available location will be used to process the
request.
* `location:us-east1 type:READ_ONLY` --> The "READ_ONLY" type replica(s)
in location "us-east1" will be used to process
the request.
include_replicas also contains an option for auto_failover_disabled which when set
Spanner will not route requests to a replica outside the
include_replicas list when all the specified replicas are unavailable
or unhealthy. The default value is `false`
"""
# [START spanner_directed_read]
# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

directed_read_options_for_client = {
"exclude_replicas": {
"replica_selections": [
{
"location": "us-east4",
},
],
},
}

# directed_read_options can be set at client level and will be used in all
# read-only transaction requests
spanner_client = spanner.Client(
directed_read_options=directed_read_options_for_client
)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

directed_read_options_for_request = {
"include_replicas": {
"replica_selections": [
{
"type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY,
},
],
"auto_failover_disabled": True,
},
}

with database.snapshot() as snapshot:
# Read rows while passing directed_read_options directly to the query.
# These will override the options passed at Client level.
results = snapshot.execute_sql(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums",
directed_read_options=directed_read_options_for_request,
)

for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
# [END spanner_directed_read]


if __name__ == "__main__": # noqa: C901
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
Expand Down Expand Up @@ -2862,6 +2935,7 @@ def drop_sequence(instance_id, database_id):
"--database_role", default="new_parent"
)
enable_fine_grained_access_parser.add_argument("--title", default="condition title")
subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__)

args = parser.parse_args()

Expand Down Expand Up @@ -2993,3 +3067,5 @@ def drop_sequence(instance_id, database_id):
args.database_role,
args.title,
)
elif args.command == "directed_read_options":
directed_read_options(args.instance_id, args.database_id)
7 changes: 7 additions & 0 deletions samples/samples/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,3 +852,10 @@ def test_drop_sequence(capsys, instance_id, bit_reverse_sequence_database):
"Altered Customers table to drop DEFAULT from CustomerId column and dropped the Seq sequence on database"
in out
)


@pytest.mark.dependency(depends=["insert_data"])
def test_directed_read_options(capsys, instance_id, sample_database):
snippets.directed_read_options(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out
Loading