Skip to content
Merged
7 changes: 6 additions & 1 deletion .github/workflows/tests_role_ravendb_node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ jobs:

- name: Run modules scenarios
working-directory: roles/ravendb_node
run: molecule test -s plugins_all_unsecured
run: molecule test -s plugins-unsecured

# todo: enable once we put a license file in the repo
# - name: Run modules scenarios nodes
# working-directory: roles/ravendb_node
# run: molecule test -s plugins-unsecured-nodes

- name: Set up .NET environment variables
run: |
Expand Down
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ The full changelog is maintained in [changelogs/changelog.yml](./changelogs/chan

### Added
- Database placement on specific nodes via `topology_members` in `ravendb.ravendb.database`.
- Database topology reconciliation: add/remove nodes from a database's topology.
- Index deployment mode support (`rolling`, `parallel`) in `ravendb.ravendb.index`.
- Per-index configuration reconciliation via `index_configuration` in `ravendb.ravendb.index`.

Expand Down
1 change: 0 additions & 1 deletion changelogs/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ releases:
- added:
- Database placement controls in `ravendb.ravendb.database`:
- Fixed placement via `topology_members` to deploy on specific node tags.
- Membership reconciliation to add/remove nodes from a database topology.
- Index deployment mode in `ravendb.ravendb.index` (rolling/parallel).
- Per-index configuration reconciliation in `ravendb.ravendb.index` via `index_configuration`.
- changed:
Expand Down
28 changes: 4 additions & 24 deletions plugins/module_utils/core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def db_no_changes(base):
return "{} No changes.".format(base)


def rf_required_on_create():
return "replication_factor is required when creating a database."


def settings_applied(prefix, keys):
ks = ", ".join(sorted(keys)) if not isinstance(keys, str) else keys
return "{} Applied settings ({}) and reloaded.".format(prefix, ks)
Expand Down Expand Up @@ -176,27 +180,3 @@ def node_added(tag, node_type):

def failed_add_node(tag, error):
return "Failed to add node '{}': {}".format(tag, error)


def _fmt_tags(tags):
return ", ".join(tags) if tags else ""


def members_would_reconcile(db, to_add, to_remove):
if to_add and to_remove:
return "Database '{}' would reconcile members: add [{}]; remove [{}].".format(db, _fmt_tags(to_add), _fmt_tags(to_remove))
if to_add:
return "Database '{}' would add members: [{}].".format(db, _fmt_tags(to_add))
if to_remove:
return "Database '{}' would remove members: [{}].".format(db, _fmt_tags(to_remove))
return "Database '{}' membership already matches.".format(db)


def members_reconciled(db, to_add, to_remove):
if to_add and to_remove:
return "Database '{}' reconciled members: added [{}]; removed [{}].".format(db, _fmt_tags(to_add), _fmt_tags(to_remove))
if to_add:
return "Database '{}' added members: [{}].".format(db, _fmt_tags(to_add))
if to_remove:
return "Database '{}' removed members: [{}].".format(db, _fmt_tags(to_remove))
return "Database '{}' membership already matched.".format(db)
9 changes: 8 additions & 1 deletion plugins/module_utils/core/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,20 @@ def validate_replication_factor(factor):
return True, None


def validate_replication_factor_optional(factor):
"""Accepts None or a positive integer."""
if factor is None:
return True, None
return validate_replication_factor(factor)


def validate_topology_members(members, replication_factor):
"""Validate that topology_members is a list of tags with length == replication_factor."""
if not members:
return True, None
if not isinstance(members, list) or not all(isinstance(m, str) for m in members):
return False, "topology_members must be a list of strings."
if len(members) != replication_factor:
if replication_factor is not None and len(members) != replication_factor:
return False, "topology_members length ({}) must equal replication_factor ({}).".format(len(members), replication_factor)
return True, None

Expand Down
2 changes: 1 addition & 1 deletion plugins/module_utils/dto/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self,


class DatabaseSpec(object):
def __init__(self, url, name, replication_factor=1, settings=None, encryption=None, members=None):
def __init__(self, url, name, replication_factor=None, settings=None, encryption=None, members=None):
if settings is None:
settings = {}
if encryption is None:
Expand Down
53 changes: 22 additions & 31 deletions plugins/module_utils/reconcilers/database_reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,29 @@ def ensure_present(self, spec, tls, check_mode):
existing_databases = dbs.list_databases(self.ctx)
created = False

if spec.members:
wanted = list(dict.fromkeys(spec.members))
if len(wanted) != spec.replication_factor:
return ModuleResult.error(msg="topology_members length ({}) must equal replication_factor ({}).".format(len(wanted), spec.replication_factor))
try:
cluster_tags = set(collect_tags(fetch_topology(self.ctx)))
except Exception as e:
return ModuleResult.error(msg="Failed to fetch cluster topology: {}".format(str(e)))
if spec.name not in existing_databases:
if spec.replication_factor is None:
return ModuleResult.error(msg=msg.rf_required_on_create())

missing = [t for t in wanted if t not in cluster_tags]
if missing:
return ModuleResult.error(msg="Unknown node tags in topology_members: {}".format(", ".join(missing)))
if spec.members:
wanted = list(dict.fromkeys(spec.members))
if len(wanted) != spec.replication_factor:
return ModuleResult.error(
msg="topology_members length ({}) must equal replication_factor ({}).".format(
len(wanted), spec.replication_factor
)
)
try:
cluster_tags = set(collect_tags(fetch_topology(self.ctx)))
except Exception as e:
return ModuleResult.error(msg="Failed to fetch cluster topology: {}".format(str(e)))

spec.members = wanted
missing = [t for t in wanted if t not in cluster_tags]
if missing:
return ModuleResult.error(msg="Unknown node tags in topology_members: {}".format(", ".join(missing)))

spec.members = wanted

if spec.name not in existing_databases:
if spec.encryption.enabled:
if check_mode:
return ModuleResult.ok(msg=msg.db_would_create(spec.name, encrypted=True), changed=True)
Expand All @@ -68,25 +75,9 @@ def ensure_present(self, spec, tls, check_mode):
# toggling between encrypted db and regular db is forbidden
return ModuleResult.error(msg=msg.encryption_mismatch(spec.name, actual_flag, spec.encryption.enabled))
base_msg = msg.db_exists(spec.name)

if spec.members:
try:
changed_m, to_add, to_remove = dbs.reconcile_membership(
self.ctx,
spec.name,
record,
spec.members,
encrypted=actual_flag,
enc_key_path=spec.encryption.key_path,
tls=tls,
check_mode=check_mode,
)
except Exception as e:
return ModuleResult.error(msg=str(e))
if changed_m:
if check_mode:
return ModuleResult.ok(msg=msg.members_would_reconcile(spec.name, to_add, to_remove), changed=True)
base_msg = msg.members_reconciled(spec.name, to_add, to_remove)
created = True
return ModuleResult.error(msg="topology_members is only supported on database creation; modifying an existing database topology is blocked.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ModuleResult.error(msg="topology_members is only supported on database creation; modifying an existing database topology is blocked.")
return ModuleResult.error(msg="topology_members is only supported on database creation; modifying an existing database topology is not supported.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if spec.settings:
current = setsvc.get_current(self.ctx, spec.name)
Expand Down
170 changes: 5 additions & 165 deletions plugins/module_utils/services/database_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible_collections.ravendb.ravendb.plugins.module_utils.core.tls import TLSConfig
from ansible_collections.ravendb.ravendb.plugins.module_utils.services import encryption_service as encsvc


def list_databases(ctx, start=0, max=128):
Expand All @@ -29,12 +28,12 @@ def create_database(ctx, db_name, replication_factor, encrypted, members=None, t
import requests
body = {
"DatabaseName": db_name,
"ReplicationFactor": len(members),
"ReplicationFactor": replication_factor,
"Encrypted": bool(encrypted),
"DisableDynamicNodesDistribution": True,
"Topology": {
"Members": list(members),
"ReplicationFactor": len(members),
"ReplicationFactor": replication_factor,
"DynamicNodesDistribution": False,
},
}
Expand All @@ -53,165 +52,6 @@ def create_database(ctx, db_name, replication_factor, encrypted, members=None, t
ctx.maintenance_server().send(CreateDatabaseOperation(rec, replication_factor))


def delete_database(
ctx,
db_name,
from_nodes=None,
hard_delete=None,
tls=None,
time_to_wait_sec=None,
):
if not from_nodes and hard_delete is None and time_to_wait_sec is None:
from ravendb.serverwide.operations.common import DeleteDatabaseOperation
ctx.maintenance_server().send(DeleteDatabaseOperation(db_name))
return

import requests
base = ctx.store.urls[0].rstrip("/")
url = base + "/admin/databases" # todo: move to client operation when it will be supported
params = {}
if hard_delete is True:
params["hardDelete"] = "true"
elif hard_delete is False:
params["hardDelete"] = "false"
if time_to_wait_sec is not None:
params["timeToWaitForConfirmationInSec"] = str(int(time_to_wait_sec))

payload = {"DatabaseNames": [db_name]}
if from_nodes:
payload["FromNodes"] = list(from_nodes)

cert, verify = (tls or TLSConfig()).to_requests_tuple()
r = requests.delete(url, params=params, json=payload, cert=cert, verify=verify, timeout=30)
r.raise_for_status()


def add_member_if_needed(ctx, db_name, node_tag, tls=None):
try:
rec = get_record(ctx, db_name)
if node_tag in set(_extract_members_from_record(rec)):
return False
except Exception:
pass

from ravendb.serverwide.operations.common import AddDatabaseNodeOperation
try:
ctx.maintenance_server().send(AddDatabaseNodeOperation(db_name, node_tag))
return True
except Exception as e:
try:
rec = get_record(ctx, db_name)
if node_tag in set(_extract_members_from_record(rec)):
return False
except Exception:
pass
if _already_member_error(e):
return False
raise


def _already_member_error(e):
s = str(e).lower()
return ("already part of" in s or "already in its topology" in s or "already part of it" in s)


def _extract_members_from_record(record):
topo = (
getattr(record, "topology", None) or getattr(record, "Topology", None)
if record and not isinstance(record, dict)
else (record or {}).get("topology") or (record or {}).get("Topology")
)
if not topo:
return []

def _group(name):
if isinstance(topo, dict):
return topo.get(name) or topo.get(name.capitalize())
return getattr(topo, name, None) or getattr(topo, name.capitalize(), None)

tags = []
for name in ("members", "promotables", "rehabs"):
tags.extend(_pluck_tags(_group(name)))

# filter out falsy values and keep order unique
return list(dict.fromkeys(t for t in tags if t))


def _pluck_tags(group):
if not group:
return []

if isinstance(group, dict):
return [str(k).strip() for k in group.keys()]

if isinstance(group, str):
return [group.strip()]
try:
items = list(group)
except TypeError:
return []

out = []
for item in items:
if isinstance(item, str):
out.append(item.strip())
continue
if isinstance(item, dict):
t = item.get("NodeTag") or item.get("node_tag")
else:
t = getattr(item, "NodeTag", None) or getattr(item, "node_tag", None)
if t:
out.append(str(t).strip())

return list(dict.fromkeys(out))


def members_delta(record, wanted):
cur = set(str(x).strip() for x in _extract_members_from_record(record))
want = set(str(x).strip() for x in (wanted or []))
to_add = sorted(list(want - cur))
to_remove = sorted(list(cur - want))
return to_add, to_remove


def reconcile_membership(
ctx,
db_name,
record,
wanted_members,
encrypted,
enc_key_path=None,
tls=None,
check_mode=False,
):
try:
record = get_record(ctx, db_name)
except Exception:
pass

to_add, to_remove = members_delta(record, wanted_members)
if not to_add and not to_remove:
return False, [], []

if check_mode:
return True, to_add, to_remove

changed_any = False

if encrypted and to_add:
if not enc_key_path:
raise RuntimeError(
"Database '{}' is encrypted. Adding nodes requires 'encryption_key' for key distribution.".format(db_name)
)
key = encsvc.read_key(enc_key_path)
encsvc.distribute_key(ctx, db_name, key, tls or TLSConfig(), only_tags=to_add)

for t in list(to_add):
if add_member_if_needed(ctx, db_name, t, tls=tls):
changed_any = True

if to_remove:
delete_database(ctx, db_name, from_nodes=to_remove, hard_delete=False, tls=tls, time_to_wait_sec=30)
changed_any = True

return changed_any, to_add, to_remove
def delete_database(ctx, db_name):
from ravendb.serverwide.operations.common import DeleteDatabaseOperation
ctx.maintenance_server().send(DeleteDatabaseOperation(db_name))
Loading