Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60f1ff1
Add ignore param to createDocuments for silent duplicate handling
premtsd-code Apr 9, 2026
bee42d4
Add e2e tests for createDocuments ignore mode
premtsd-code Apr 9, 2026
93a9136
Fix Mongo adapter ignore mode: pass ignoreDuplicates to client and fi…
premtsd-code Apr 10, 2026
2906dda
Revert "Fix Mongo adapter ignore mode: pass ignoreDuplicates to clien…
premtsd-code Apr 10, 2026
63d9902
Replace ignore param with skipDuplicates scope guard
premtsd-code Apr 10, 2026
b0a8392
Push skipDuplicates scope guard down to Adapter layer
premtsd-code Apr 12, 2026
d8f647c
Refactor: extract helpers and collapse conditional wraps
premtsd-code Apr 12, 2026
c88c6ac
fix: guard buildUidTenantLookup against empty UID set
premtsd-code Apr 12, 2026
16849fe
Add e2e tests for skipDuplicates edge cases
premtsd-code Apr 13, 2026
c6d566e
SQL adapter: remove redundant skipDuplicates pre-filter
premtsd-code Apr 13, 2026
fd37b69
Mongo adapter: use upsertWithCounts for race-safe accurate counts
premtsd-code Apr 13, 2026
a24358c
Merge remote-tracking branch 'origin/main' into csv-import-upsert-v2
premtsd-code Apr 13, 2026
3b783af
Revert "SQL adapter: remove redundant skipDuplicates pre-filter"
premtsd-code Apr 13, 2026
3a483f2
Mirror: forward only docs the source actually inserted
premtsd-code Apr 13, 2026
eb99cf1
skipDuplicates: simplify by moving pre-filter to orchestrator only
premtsd-code Apr 13, 2026
89e4cf8
skipDuplicates: drop deferred-relationships, inline pre-filter, tight…
premtsd-code Apr 13, 2026
41704eb
Address Jake's review on PR #852
premtsd-code Apr 13, 2026
e9e5e76
Mirror::createDocuments: bound skipDuplicates capture to O($batchSize)
premtsd-code Apr 14, 2026
ae929db
Restore per-tenant grouping for batch existing-doc lookups in tenantP…
premtsd-code Apr 14, 2026
431d378
skipDuplicates: drop pre-filter, rely on adapter-level dedup
premtsd-code Apr 14, 2026
0fd7c33
skipDuplicates: drop intra-batch dedup, trim verbose test comments
premtsd-code Apr 14, 2026
9baaa04
Group skipDuplicates / getInsert* into their logical clusters
premtsd-code Apr 15, 2026
3c85013
Mirror::createDocuments: re-fetch source after skipDuplicates insert
premtsd-code Apr 15, 2026
934ec04
Mirror::createDocuments: pre-filter existing ids + unify skip/non-ski…
premtsd-code Apr 15, 2026
fa0e373
Chunk id-lookup queries to respect RELATION_QUERY_CHUNK_SIZE
premtsd-code Apr 15, 2026
52b189b
Chunk id lookups by \$this->maxQueryValues, not RELATION_QUERY_CHUNK_…
premtsd-code Apr 15, 2026
fbe5117
Address Jake's follow-up review: tighten limits, reuse tenantKey, let…
premtsd-code Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ abstract class Adapter

protected bool $alterLocks = false;

protected bool $skipDuplicates = false;

/**
* @var array<string, mixed>
*/
Expand Down Expand Up @@ -392,6 +394,27 @@ public function inTransaction(): bool
return $this->inTransaction > 0;
}

/**
* Run a callback with skipDuplicates enabled.
* Duplicate key errors during createDocuments() will be silently skipped
* instead of thrown. Nestable — saves and restores previous state.
*
* @template T
* @param callable(): T $callback
* @return T
*/
public function skipDuplicates(callable $callback): mixed
{
$previous = $this->skipDuplicates;
$this->skipDuplicates = true;

try {
return $callback();
} finally {
$this->skipDuplicates = $previous;
}
}

/**
* @template T
* @param callable(): T $callback
Expand Down
41 changes: 41 additions & 0 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public function withTransaction(callable $callback): mixed
return $callback();
}

// upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation.
if ($this->skipDuplicates) {
return $callback();
}

try {
$this->startTransaction();
$result = $callback();
Expand Down Expand Up @@ -1492,6 +1497,42 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead.
if ($this->skipDuplicates) {
if (empty($records)) {
return [];
}

$operations = [];
foreach ($records as $record) {
$filter = ['_uid' => $record['_uid'] ?? ''];
if ($this->sharedTables) {
$filter['_tenant'] = $record['_tenant'] ?? $this->getTenant();
}

// Filter fields can't reappear in $setOnInsert (mongo path-conflict error).
$setOnInsert = $record;
unset($setOnInsert['_uid'], $setOnInsert['_tenant']);

if (empty($setOnInsert)) {
continue;
}

$operations[] = [
'filter' => $filter,
'update' => ['$setOnInsert' => $setOnInsert],
];
}

try {
$this->client->upsert($name, $operations, $options);
} catch (MongoException $e) {
throw $this->processException($e);
}

return $documents;
Comment thread
abnegate marked this conversation as resolved.
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
15 changes: 15 additions & 0 deletions src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public function __construct(UtopiaPool $pool)
public function delegate(string $method, array $args): mixed
{
if ($this->pinnedAdapter !== null) {
if ($this->skipDuplicates) {
return $this->pinnedAdapter->skipDuplicates(
fn () => $this->pinnedAdapter->{$method}(...$args)
);
}
return $this->pinnedAdapter->{$method}(...$args);
}

Expand All @@ -66,6 +71,11 @@ public function delegate(string $method, array $args): mixed
$adapter->setMetadata($key, $value);
}

if ($this->skipDuplicates) {
return $adapter->skipDuplicates(
fn () => $adapter->{$method}(...$args)
);
}
return $adapter->{$method}(...$args);
});
}
Expand Down Expand Up @@ -146,6 +156,11 @@ public function withTransaction(callable $callback): mixed

$this->pinnedAdapter = $adapter;
try {
if ($this->skipDuplicates) {
return $adapter->skipDuplicates(
fn () => $adapter->withTransaction($callback)
);
}
return $adapter->withTransaction($callback);
} finally {
$this->pinnedAdapter = null;
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,35 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool
return false;
}

protected function getInsertKeyword(): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(string $table): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(): string
{
if (!$this->skipDuplicates) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

public function decodePoint(string $wkb): array
{
if (str_starts_with(strtoupper($wkb), 'POINT(')) {
Expand Down
36 changes: 33 additions & 3 deletions src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,33 @@ public function getSupportForHostname(): bool
return true;
}

/**
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertKeyword(): string
{
return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO';
}

/**
* Returns a suffix appended after VALUES clause for duplicate handling.
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
*/
protected function getInsertSuffix(string $table): string
{
return '';
}

/**
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertPermissionsSuffix(): string
{
return '';
}

/**
* Get current attribute count from collection document
*
Expand Down Expand Up @@ -2476,6 +2503,7 @@ public function createDocuments(Document $collection, array $documents): array
if (empty($documents)) {
return $documents;
}

$spatialAttributes = $this->getSpatialAttributes($collection);
$collection = $collection->getId();
try {
Expand Down Expand Up @@ -2573,8 +2601,9 @@ public function createDocuments(Document $collection, array $documents): array
$batchKeys = \implode(', ', $batchKeys);

$stmt = $this->getPDO()->prepare("
INSERT INTO {$this->getSQLTable($name)} {$columns}
{$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns}
VALUES {$batchKeys}
{$this->getInsertSuffix($name)}
");

foreach ($bindValues as $key => $value) {
Expand All @@ -2588,8 +2617,9 @@ public function createDocuments(Document $collection, array $documents): array
$permissions = \implode(', ', $permissions);

$sqlPermissions = "
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions};
{$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions}
{$this->getInsertPermissionsSuffix()}
";

$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
Expand Down
5 changes: 5 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -1936,4 +1936,9 @@ public function getSupportForTTLIndexes(): bool
{
return false;
}

protected function getInsertKeyword(): string
{
return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
}
}
96 changes: 81 additions & 15 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class Database

protected bool $preserveDates = false;

protected bool $skipDuplicates = false;

protected bool $preserveSequence = false;

protected int $maxQueryValues = 5000;
Expand Down Expand Up @@ -842,6 +844,29 @@ public function skipRelationshipsExistCheck(callable $callback): mixed
}
}

public function skipDuplicates(callable $callback): mixed
{
$previous = $this->skipDuplicates;
$this->skipDuplicates = true;

try {
return $callback();
} finally {
$this->skipDuplicates = $previous;
}
}

/**
* Build a tenant-aware identity key for a document.
* Returns "<tenant>:<id>" in tenant-per-document shared-table mode, otherwise just the id.
*/
private function tenantKey(Document $document): string
{
return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument())
? $document->getTenant() . ':' . $document->getId()
: $document->getId();
}

/**
* Trigger callback for events
*
Expand Down Expand Up @@ -5700,9 +5725,11 @@ public function createDocuments(
}

foreach (\array_chunk($documents, $batchSize) as $chunk) {
$batch = $this->withTransaction(function () use ($collection, $chunk) {
return $this->adapter->createDocuments($collection, $chunk);
});
$insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk));
// Set adapter flag before withTransaction so Mongo can opt out of a real txn.
$batch = $this->skipDuplicates
? $this->adapter->skipDuplicates($insert)
: $insert();
Comment on lines +5728 to +5732
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard the relationship writes, not just the final batch insert.

skipDuplicates() only wraps adapter->createDocuments(), but createDocumentRelationships() already ran at Line 5721. If one of these parents is skipped as a duplicate, any child/junction writes done during relationship expansion are still applied, and many-to-many junction inserts can still throw DuplicateException because createDocument() does not honor this guard. That breaks the advertised “silently skip duplicate rows” behavior and can leave orphan relationship data.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Database.php` around lines 5728 - 5732, The relationship writes
(createDocumentRelationships / any createDocument calls) must be wrapped by the
same skipDuplicates guard as the batch insert so duplicates are skipped
atomically; move or refactor so the closure passed to adapter->skipDuplicates
(or the $insert closure) contains both the relationship expansion and the
adapter->createDocuments call (or call createDocumentRelationships inside the
withTransaction closure) so that adapter->skipDuplicates can opt out/skip all
child/junction writes, not just createDocuments. Ensure you reference and update
the existing $insert closure, createDocumentRelationships, createDocument, and
adapter->skipDuplicates usage so relationship writes occur inside the
guarded/transactional block.


$batch = $this->adapter->getSequences($collection->getId(), $batch);

Expand Down Expand Up @@ -7116,18 +7143,57 @@ public function upsertDocumentsWithIncrease(
$created = 0;
$updated = 0;
$seenIds = [];
foreach ($documents as $key => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
))));
} else {
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
)));

// Batch-fetch existing documents in one query instead of N individual getDocument() calls.
// tenantPerDocument: group ids by tenant and run one find() per tenant under withTenant,
// so cross-tenant batches (e.g. StatsUsage worker) don't get silently scoped to the
// session tenant and miss rows belonging to other tenants.
$existingDocs = [];

if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$idsByTenant = [];
foreach ($documents as $doc) {
if ($doc->getId() !== '') {
$idsByTenant[$doc->getTenant()][] = $doc->getId();
}
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $chunk) {
$found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(
fn () => $this->find($collection->getId(), [
Query::equal('$id', $chunk),
Query::limit($this->maxQueryValues),
])
)));
foreach ($found as $doc) {
$existingDocs[$this->tenantKey($doc)] = $doc;
}
}
}
} else {
$docIds = \array_values(\array_unique(\array_filter(
\array_map(fn (Document $doc) => $doc->getId(), $documents),
fn ($id) => $id !== ''
)));

if (!empty($docIds)) {
foreach (\array_chunk($docIds, \max(1, $this->maxQueryValues)) as $chunk) {
$existing = $this->authorization->skip(fn () => $this->silent(
fn () => $this->find($collection->getId(), [
Query::equal('$id', $chunk),
Query::limit($this->maxQueryValues),
])
));
foreach ($existing as $doc) {
$existingDocs[$this->tenantKey($doc)] = $doc;
}
}
}
}
Comment on lines +7153 to +7193
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restore the no-auth skipPreRead branch for increment upserts.

This unconditional prefetch changes the !empty($attribute) && !$this->authorization->getStatus() path. In that mode, incoming $permissions are supposed to be ignored; with $old always hydrated, the current skipPermissionsUpdate logic can now persist caller-supplied permissions on creates/updates. That is a behavior regression in upsertDocumentsWithIncrease().

Based on learnings, Database::upsertDocumentsWithIncrease() uses $skipPreRead = !empty($attribute) && !$this->authorization->getStatus(), and in that path incoming $permissions must be ignored while creates default to [] and updates preserve existing permissions.

Also applies to: 7213-7314

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Database.php` around lines 7153 - 7193, The unconditional
prefetch removed the no-auth "skipPreRead" behavior used by
upsertDocumentsWithIncrease(), causing caller-supplied $permissions to be
persisted incorrectly; restore the original branch: detect $skipPreRead =
!empty($attribute) && !$this->authorization->getStatus() in
upsertDocumentsWithIncrease(), and when $skipPreRead is true skip the pre-read
of existing documents (do not hydrate $old/$existingDocs), ensure creates get
default permissions [] and updates preserve existing permissions (i.e. ignore
incoming $permissions), and apply the same conditional (no prefetch when
skipPreRead) to the other similar block referenced (around the later section
7213-7314) so both code paths behave consistently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipPreRead is not merged in main branch yet

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@premtsd-code Thanks for the clarification — acknowledged. Since skipPreRead hasn’t landed in main yet, my earlier regression note tied to that path doesn’t apply to this PR. The batch pre-read you added in upsertDocumentsWithIncrease() is fine as-is.

If you’d like, I can open a follow-up issue to re-check permission semantics once skipPreRead merges (so we ensure “updates preserve existing permissions; creates default to []; ignore incoming $permissions” in the no-auth + increment case). Happy to proceed if you want that tracked.


🧠 Learnings used
Learnt from: premtsd-code
Repo: utopia-php/database PR: 850
File: src/Database/Database.php:0-0
Timestamp: 2026-04-08T16:07:15.177Z
Learning: In src/Database/Database.php, both createDocuments() and upsertDocumentsWithIncrease() prefetch existing document IDs in chunks using array_chunk(..., max(1, $this->maxQueryValues)) for tenant-per-document and non-tenant modes to satisfy DocumentsValidator limits on Query::equal('$id', ...).

Learnt from: premtsd-code
Repo: utopia-php/database PR: 830
File: src/Database/Database.php:7086-7092
Timestamp: 2026-03-08T22:55:28.782Z
Learning: In src/Database/Database.php, Database::upsertDocumentsWithIncrease() sets $skipPreRead = !empty($attribute) && !$this->authorization->getStatus(), so the pre-read-skip path only runs when authorization is disabled. In this path, explicit $permissions on incoming documents are intentionally ignored; updates preserve existing permissions, and creates default to empty permissions ([]), since no auth layer enforces them. Stated by maintainer premtsd-code on PR `#830`.

Learnt from: fogelito
Repo: utopia-php/database PR: 733
File: src/Database/Adapter/MariaDB.php:1801-1806
Timestamp: 2025-10-16T09:37:33.531Z
Learning: In the MariaDB adapter (src/Database/Adapter/MariaDB.php), only duplicate `_uid` violations should throw `DuplicateException`. All other unique constraint violations, including `PRIMARY` key collisions on the internal `_id` field, should throw `UniqueException`. This is the intended design to distinguish between user-facing document duplicates and internal/user-defined unique constraint violations.

Learnt from: abnegate
Repo: utopia-php/database PR: 721
File: tests/e2e/Adapter/Scopes/DocumentTests.php:6418-6439
Timestamp: 2025-10-03T02:04:17.803Z
Learning: In tests/e2e/Adapter/Scopes/DocumentTests::testSchemalessDocumentInvalidInteralAttributeValidation (PHP), when the adapter reports getSupportForAttributes() === false (schemaless), the test should not expect exceptions from createDocuments for “invalid” internal attributes; remove try/catch and ensure the test passes without exceptions, keeping at least one assertion.


foreach ($documents as $key => $document) {
$old = $existingDocs[$this->tenantKey($document)] ?? new Document();

// Extract operators early to avoid comparison issues
$documentArray = $document->getArrayCopy();
Expand Down Expand Up @@ -7294,7 +7360,7 @@ public function upsertDocumentsWithIncrease(
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
}

$seenIds[] = $document->getId();
$seenIds[] = $this->tenantKey($document);
$old = $this->adapter->castingBefore($collection, $old);
$document = $this->adapter->castingBefore($collection, $document);

Expand Down
Loading
Loading