Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false">
stopOnFailure="true">
<testsuites>
<testsuite name="unit">
<directory>./tests/unit</directory>
Expand Down
100 changes: 75 additions & 25 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ class Database

protected ?\DateTime $timestamp = null;

protected ?Document $cursor = null;

protected bool $resolveRelationships = true;

protected bool $checkRelationshipsExist = true;
Expand Down Expand Up @@ -614,6 +616,26 @@ public function withRequestTimestamp(?\DateTime $requestTimestamp, callable $cal
return $result;
}

/**
* Executes $callback with $cursor Document
*
* @template T
* @param callable(): T $callback
* @return T
*/
public function withCursor(?Document $cursor, callable $callback): mixed
{
$previous = $this->cursor;
$this->cursor = $cursor;

try {
$result = $callback();
} finally {
$this->cursor = $previous;
}
return $result;
}

/**
* Set Namespace.
*
Expand Down Expand Up @@ -4358,10 +4380,6 @@ public function updateDocuments(
$limit = $grouped['limit'];
$cursor = $grouped['cursor'];

if (!empty($cursor) && $cursor->getCollection() !== $collection->getId()) {
throw new DatabaseException("cursor Document must be from the same Collection.");
}

unset($updates['$id']);
unset($updates['$createdAt']);
unset($updates['$tenant']);
Expand Down Expand Up @@ -4403,7 +4421,7 @@ public function updateDocuments(
];

if (!empty($last)) {
$new[] = Query::cursorAfter($last);
$new[] = Query::cursorAfter($last->getId());
}

$batch = $this->silent(fn () => $this->find(
Expand Down Expand Up @@ -5755,13 +5773,8 @@ public function deleteDocuments(
$grouped = Query::groupByType($queries);
$limit = $grouped['limit'];
$cursor = $grouped['cursor'];

if (!empty($cursor) && $cursor->getCollection() !== $collection->getId()) {
throw new DatabaseException("Cursor document must be from the same Collection.");
}

$originalLimit = $limit;
$last = $cursor;
$last = null;
$modified = 0;

while (true) {
Expand All @@ -5775,17 +5788,21 @@ public function deleteDocuments(
Query::limit($batchSize)
];

if (!empty($last)) {
$new[] = Query::cursorAfter($last);
if (!empty($cursor)) {
$new[] = Query::cursorAfter($cursor);
}

/**
* @var array<Document> $batch
*/
$batch = $this->silent(fn () => $this->find(
$collection->getId(),
array_merge($new, $queries),
forPermission: Database::PERMISSION_DELETE

$batch = $this->silent(fn () => $this->withCursor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, thinking perhaps add a second param to the Query::cursor , which will be the payload
query::cursor(string $id, Document $payload)
So if we have a payload will use that and not the $id?

$last,
fn () => $this->find(
$collection->getId(),
array_merge($new, $queries),
forPermission: Database::PERMISSION_DELETE
)
));

if (empty($batch)) {
Expand All @@ -5795,7 +5812,16 @@ public function deleteDocuments(
$internalIds = [];
$permissionIds = [];
foreach ($batch as $document) {
if (empty($document->getInternalId())){
throw new QueryException('$internalId must not be empty');
}

$internalIds[] = $document->getInternalId();

if (!isset($document['$permissions'])) {
throw new QueryException('$permissions key is missing');
}

if (!empty($document->getPermissions())) {
$permissionIds[] = $document->getId();
}
Expand All @@ -5808,6 +5834,10 @@ public function deleteDocuments(
}

// Check if document was updated after the request timestamp
if (empty($document->getUpdatedAt())){
throw new QueryException('$updatedAt must not be empty');
}

try {
$oldUpdatedAt = new \DateTime($document->getUpdatedAt());
} catch (Exception $e) {
Expand All @@ -5819,6 +5849,16 @@ public function deleteDocuments(
}
}

$last = $batch[array_key_last($batch)];
$cursor = $last->getId();
/**
* Since we delete data, no cursor will be found later on so we need to assign a payload
* Independent Cursor data regardless to find selects queries
* todo: add specific selects... (order by , $id, $internalId))
* Do we need silent here?
*/
$last = $this->silent(fn () => $this->getDocument($collection->getId(), $cursor));

$this->withTransaction(function () use ($collection, $internalIds, $permissionIds) {
$this->adapter->deleteDocuments(
$collection->getId(),
Expand All @@ -5845,8 +5885,6 @@ public function deleteDocuments(
} elseif ($originalLimit && $modified >= $originalLimit) {
break;
}

$last = \end($batch);
}

$this->trigger(self::EVENT_DOCUMENTS_DELETE, new Document([
Expand Down Expand Up @@ -5960,14 +5998,26 @@ public function find(string $collection, array $queries = [], string $forPermiss
$offset = $grouped['offset'];
$orderAttributes = $grouped['orderAttributes'];
$orderTypes = $grouped['orderTypes'];
$cursor = $grouped['cursor'];
$cursorId = $grouped['cursor'];
$cursor = [];
$cursorDirection = $grouped['cursorDirection'];

if (!empty($cursor) && $cursor->getCollection() !== $collection->getId()) {
throw new DatabaseException("cursor Document must be from the same Collection.");
}
if (!empty($cursorId)) {
if (!is_null($this->cursor) && !$this->cursor->isEmpty()) {
$cursor = $this->cursor;
} else {
/**
* todo: add specific select queries, Only what you need for the cursor
*/
$cursor = $this->getDocument($collection->getId(), $cursorId);
}

$cursor = empty($cursor) ? [] : $this->encode($collection, $cursor)->getArrayCopy();
if ($cursor->isEmpty()) {
throw new DatabaseException("Cursor not found");
}

$cursor = $this->encode($collection, $cursor)->getArrayCopy();
}

/** @var array<Query> $queries */
$queries = \array_merge(
Expand Down Expand Up @@ -6105,7 +6155,7 @@ public function foreach(string $collection, callable $callback, array $queries =
array_unshift($newQueries, Query::offset(0));
}

array_unshift($newQueries, Query::cursorAfter($latestDocument));
array_unshift($newQueries, Query::cursorAfter($latestDocument->getId()));
}
if (!$limitExists) {
$newQueries[] = Query::limit($limit);
Expand Down
8 changes: 4 additions & 4 deletions src/Database/Query.php
Original file line number Diff line number Diff line change
Expand Up @@ -512,21 +512,21 @@ public static function offset(int $value): self
/**
* Helper method to create Query with cursorAfter method
*
* @param Document $value
* @param string $value
* @return Query
*/
public static function cursorAfter(Document $value): self
public static function cursorAfter(string $value): self
{
return new self(self::TYPE_CURSOR_AFTER, values: [$value]);
}

/**
* Helper method to create Query with cursorBefore method
*
* @param Document $value
* @param string $value
* @return Query
*/
public static function cursorBefore(Document $value): self
public static function cursorBefore(string $value): self
{
return new self(self::TYPE_CURSOR_BEFORE, values: [$value]);
}
Expand Down
6 changes: 3 additions & 3 deletions src/Database/Validator/Query/Cursor.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public function isValid($value): bool
if ($method === Query::TYPE_CURSOR_AFTER || $method === Query::TYPE_CURSOR_BEFORE) {
$cursor = $value->getValue();

if ($cursor instanceof Document) {
$cursor = $cursor->getId();
}
// if ($cursor instanceof Document) {
// $cursor = $cursor->getId();
// }

$validator = new UID();
if ($validator->isValid($cursor)) {
Expand Down
Loading
Loading