Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ENV PYTHON=python3
RUN npm install -g \
node-gyp \
typescript@4.9.5
COPY package.json yarn.lock /usr/src/app/
COPY package.json yarn.lock scality-cloudserverclient-v1.0.9.tgz /usr/src/app/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to revert, I guess it's for testing

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary .tgz file checked into the repository. This should not be committed — use a proper npm registry reference or git-based dependency instead.

— Claude Code


RUN yarn install --production --ignore-optional --frozen-lockfile --ignore-engines --network-concurrency 1

Expand Down
17 changes: 16 additions & 1 deletion lib/api/apiUtils/object/versioning.js
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ function restoreMetadata(objMD, metadataStoreParams) {
* version id of the null version
*/
function overwritingVersioning(objMD, metadataStoreParams) {
metadataStoreParams.updateMicroVersionId = true;
metadataStoreParams.amzStorageClass = objMD['x-amz-storage-class'];

// set correct originOp
Expand Down Expand Up @@ -566,6 +565,21 @@ function overwritingVersioning(objMD, metadataStoreParams) {
return options;
}

/**
* @param {Object} objectMD - plain object metadata (not an ObjectMD instance)
*/
function prepareMetadataForCascadedCrr(objectMD) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be in arsenal 🤔

@SylvainSenechal SylvainSenechal Jun 2, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no i think its fine here (talking to myself)

// Bump microVersionId so cascade CRR detects the change as a new revision.
// eslint-disable-next-line no-param-reassign
objectMD.microVersionId = versionIdUtils.generateVersionId(
config.instanceId, config.replicationGroupId);
if (objectMD.replicationInfo) {
// Clear isReplica, as a user modification is no longer purely a replica
// eslint-disable-next-line no-param-reassign
objectMD.replicationInfo.isReplica = false;
}
}

module.exports = {
decodeVersionId,
getVersionIdResHeader,
Expand All @@ -577,4 +591,5 @@ module.exports = {
preprocessingVersioningDelete,
overwritingVersioning,
decodeVID,
prepareMetadataForCascadedCrr,
};
4 changes: 3 additions & 1 deletion lib/api/objectDeleteTagging.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const async = require('async');
const { errors } = require('arsenal');

const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions }
const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions,
prepareMetadataForCascadedCrr }
= require('./apiUtils/object/versioning');

const { standardMetadataValidateBucketAndObj } = require('../metadata/metadataUtils');
Expand Down Expand Up @@ -85,6 +86,7 @@ function objectDeleteTagging(authInfo, request, log, callback) {
}
// eslint-disable-next-line no-param-reassign
objectMD.originOp = 's3:ObjectTagging:Delete';
prepareMetadataForCascadedCrr(objectMD);
metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params,
log, err =>
next(err, bucket, objectMD));
Expand Down
4 changes: 3 additions & 1 deletion lib/api/objectPutLegalHold.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const async = require('async');
const { errors, errorInstances, s3middleware } = require('arsenal');

const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions } =
const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions,
prepareMetadataForCascadedCrr } =
require('./apiUtils/object/versioning');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const metadata = require('../metadata/wrapper');
Expand Down Expand Up @@ -96,6 +97,7 @@ function objectPutLegalHold(authInfo, request, log, callback) {
}
// eslint-disable-next-line no-param-reassign
objectMD.originOp = 's3:ObjectLegalHold:Put';
prepareMetadataForCascadedCrr(objectMD);
metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params,
log, err => next(err, bucket, objectMD));
},
Expand Down
4 changes: 3 additions & 1 deletion lib/api/objectPutRetention.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const async = require('async');
const { errors, errorInstances, s3middleware } = require('arsenal');

const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions } =
const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions,
prepareMetadataForCascadedCrr } =
require('./apiUtils/object/versioning');
const { ObjectLockInfo, hasGovernanceBypassHeader } =
require('./apiUtils/object/objectLockHelpers');
Expand Down Expand Up @@ -119,6 +120,7 @@ function objectPutRetention(authInfo, request, log, callback) {
objectMD.replicationInfo, replicationInfo);
}
objectMD.originOp = 's3:ObjectRetention:Put';
prepareMetadataForCascadedCrr(objectMD);
/* eslint-enable no-param-reassign */
metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params,
log, err => next(err, bucket, objectMD));
Expand Down
4 changes: 3 additions & 1 deletion lib/api/objectPutTagging.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const async = require('async');
const { errors, s3middleware } = require('arsenal');

const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions } =
const { decodeVersionId, getVersionIdResHeader, getVersionSpecificMetadataOptions,
prepareMetadataForCascadedCrr } =
require('./apiUtils/object/versioning');

const { standardMetadataValidateBucketAndObj } = require('../metadata/metadataUtils');
Expand Down Expand Up @@ -89,6 +90,7 @@ function objectPutTagging(authInfo, request, log, callback) {
}
// eslint-disable-next-line no-param-reassign
objectMD.originOp = 's3:ObjectTagging:Put';
prepareMetadataForCascadedCrr(objectMD);
metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params,
log, err =>
next(err, bucket, objectMD));
Expand Down
3 changes: 2 additions & 1 deletion lib/metadata/acl.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { errors } = require('arsenal');

const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo');
const { prepareMetadataForCascadedCrr } = require('../api/apiUtils/object/versioning');
const aclUtils = require('../utilities/aclUtils');
const constants = require('../../constants');
const metadata = require('../metadata/wrapper');
Expand Down Expand Up @@ -56,7 +57,7 @@ const acl = {
...replicationInfo,
};
}

prepareMetadataForCascadedCrr(objectMD);
return metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params, log, cb);
}
return cb();
Expand Down
119 changes: 110 additions & 9 deletions lib/routes/routeBackbeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const joi = require('@hapi/joi');
const backbeatProxy = httpProxy.createProxyServer({
ignorePath: true,
});
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage } =
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage, versioning } =
require('arsenal');

const { responseJSONBody } = s3routes.routesUtils;
Expand All @@ -25,6 +25,8 @@ const { dataStore } = require('../api/apiUtils/object/storeObject');
const prepareRequestContexts = require(
'../api/apiUtils/authorization/prepareRequestContexts');
const { decodeVersionId } = require('../api/apiUtils/object/versioning');
const writeContinue = require('../utilities/writeContinue');
const { decode, encode, checkCrrCascadeEvent } = versioning.VersionID;
const locationKeysHaveChanged
= require('../api/apiUtils/object/locationKeysHaveChanged');
const { standardMetadataValidateBucketAndObj,
Expand All @@ -47,6 +49,8 @@ const quotaUtils = require('../api/apiUtils/quotas/quotaUtils');
const { handleAuthorizationResults } = require('../api/api');
const { versioningPreprocessing }
= require('../api/apiUtils/object/versioning');
const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo');
const { VersionIdCollisionException, StaleMicroVersionIdException } = require('@scality/cloudserverclient');
Comment thread
SylvainSenechal marked this conversation as resolved.
const {promisify} = require('util');

const versioningPreprocessingPromised = promisify(versioningPreprocessing);
Expand Down Expand Up @@ -106,7 +110,7 @@ function _isObjectRequest(req) {
].includes(req.resourceType);
}

function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
function _respondWithHeaders(response, payload, extraHeaders, log, callback, statusCode = 200) {
let body = '';
if (typeof payload === 'string') {
body = payload;
Expand All @@ -125,10 +129,10 @@ function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
// eslint-disable-next-line no-param-reassign
response.serverAccessLog.endTurnAroundTime = process.hrtime.bigint();
}
response.writeHead(200, httpHeaders);
response.writeHead(statusCode, httpHeaders);
response.end(body, 'utf8', () => {
log.end().info('responded with payload', {
httpCode: 200,
httpCode: statusCode,
contentLength: Buffer.byteLength(body),
});
callback();
Expand Down Expand Up @@ -430,6 +434,27 @@ function putData(request, response, bucketInfo, objMd, log, callback) {
log.error(errMessage);
return callback(errorInstances.BadRequest.customizeDescription(errMessage));
}

const incomingVersionIdEncoded = request.headers['x-scal-source-version-id'];
const decoded = incomingVersionIdEncoded ? decode(incomingVersionIdEncoded) : null;
const incomingVersionIdDecoded = decoded instanceof Error ? null : decoded;
if (incomingVersionIdDecoded && objMd && objMd.versionId === incomingVersionIdDecoded) {
// Skip the write if data is already at destination for this version id
// Return 409 with the existing microVersionId so backbeat can
// decide if putMetadata is still needed
log.debug('crr cascade putData: version already at destination', {
method: 'putData',
bucketName: request.bucketName,
objectKey: request.objectKey,
hasMicroVersionId: !!objMd.microVersionId,
});
return _respondWithHeaders(response,
{ code: VersionIdCollisionException.name, message: 'version id already at destination' },
{ 'x-scal-micro-version-id': objMd.microVersionId ? encode(objMd.microVersionId) : '' },
log, callback, 409);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putData returns a 409 response before calling writeContinue. If the client sent Expect: 100-continue with a body, returning 409 before consuming (or explicitly rejecting) the request stream may leave the connection in a broken state — the client could be waiting for 100-continue and then start sending data that nobody reads. Consider either calling request.destroy() or request.resume() before responding, or calling writeContinue and consuming the body before checking.

— Claude Code

}

writeContinue(request, response);
const context = {
bucketName: request.bucketName,
owner: canonicalID,
Expand Down Expand Up @@ -541,6 +566,31 @@ function getCanonicalIdsByAccountId(accountId, log, cb) {
}

function putMetadata(request, response, bucketInfo, objMd, log, callback) {
const { bucketName, objectKey } = request;

const encodedMicroVersionId = request.headers['x-scal-micro-version-id'];
const decoded = encodedMicroVersionId ? decode(encodedMicroVersionId) : null;
const incomingRaw = decoded instanceof Error ? null : decoded;
if (incomingRaw) {
const event = checkCrrCascadeEvent(incomingRaw, objMd && objMd.microVersionId);
if (event === 'loop') {
log.debug('crr cascade putMetadata: loop detected, skipping write', {
method: 'putMetadata', bucketName, objectKey,
});
return _respondWithHeaders(response, {},
{ 'x-scal-replication-loop': 'true' }, log, callback);
}
if (event === 'stale') {
log.debug('crr cascade putMetadata: stale event, rejecting', {
method: 'putMetadata', bucketName, objectKey,
});
return _respondWithHeaders(response,
{ code: StaleMicroVersionIdException.name, message: 'incoming revision is older than destination' },
{}, log, callback, 409);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use the helper writeContinue() to allow the client to streamData.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let me see I thought this was somewhat automatic

writeContinue(request, response);
return _getRequestPayload(request, (err, payload) => {
if (err) {
return callback(err);
Expand All @@ -554,14 +604,15 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
return callback(errors.MalformedPOSTRequest);
}

const { headers, bucketName, objectKey } = request;
const { headers } = request;

// Destination-side delete-marker replication.
// We need the REPLICA status to distinguish from
// source-side replication status updates that also carry isDeleteMarker=true.
if (omVal.isDeleteMarker
&& omVal.replicationInfo
&& omVal.replicationInfo.status === 'REPLICA'
&& (omVal.replicationInfo.isReplica === true
|| omVal.replicationInfo.status === 'REPLICA')
&& request.serverAccessLog) {
// eslint-disable-next-line no-param-reassign
request.serverAccessLog.replication = true;
Expand All @@ -575,7 +626,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// URI shape.
// The REPLICA status excludes source-side replication-status updates.
if (omVal.replicationInfo
&& omVal.replicationInfo.status === 'REPLICA'
&& (omVal.replicationInfo.isReplica === true
|| omVal.replicationInfo.status === 'REPLICA')
&& (omVal.originOp === 's3:ObjectTagging:Put'
|| omVal.originOp === 's3:ObjectTagging:Delete')
&& request.serverAccessLog) {
Expand All @@ -591,7 +643,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// populates the aclRequired field.
// The REPLICA status excludes source-side replication-status updates.
if (omVal.replicationInfo
&& omVal.replicationInfo.status === 'REPLICA'
&& (omVal.replicationInfo.isReplica === true
|| omVal.replicationInfo.status === 'REPLICA')
&& omVal.originOp === 's3:ObjectAcl:Put'
&& request.serverAccessLog) {
// eslint-disable-next-line no-param-reassign
Expand Down Expand Up @@ -669,7 +722,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// then we want to create a version for the replica object even though
// none was provided in the object metadata value.
if (omVal.replicationInfo.isNFS) {
const isReplica = omVal.replicationInfo.status === 'REPLICA';
const isReplica = omVal.replicationInfo.isReplica === true

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use the new helper function from arsenal : getReplicationIsReplica ?

|| omVal.replicationInfo.status === 'REPLICA';
versioning = isReplica;
omVal.replicationInfo.isNFS = !isReplica;
}
Expand Down Expand Up @@ -721,6 +775,53 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
options.isNull = isNull;
}

// Cascade triggering
// If the bucket receiving this replica has its own CRR rules, set
// status to PENDING so the queue populator here picks it up for the
// next hop. If not, clear the source-side replicationInfo fields
// Always mark isReplica=true.
if (incomingRaw) {
const isMDOnly = headers['x-scal-replication-content'] === 'METADATA';
const objSize = omVal['content-length'] || 0;

// These S3-compatible Scality locations are excluded
// as cascade targets because they use the MultiBackend S3 path which
// bypasses the putData/putMetadata routes, so loop detection cannot fire
// on those destinations.
const BLOCKED_LOCATION_TYPES = [
'location-scality-ring-s3-v1',
'location-scality-artesca-s3-v1',
];

const nextReplInfo = getReplicationInfo(
config, objectKey, bucketInfo, isMDOnly, objSize,
null, null, null);

if (nextReplInfo) {
nextReplInfo.backends = nextReplInfo.backends.filter(b => {
const loc = config.locationConstraints[b.site];
return !loc || !BLOCKED_LOCATION_TYPES.includes(loc.type);
});
}

if (nextReplInfo && nextReplInfo.backends.length > 0) {
omVal.replicationInfo = nextReplInfo;
} else {
omVal.replicationInfo = {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be an arsenal function as its updating metadata

status: '',
backends: [],
content: [],
destination: '',
storageClass: '',
role: '',
storageType: '',
dataStoreVersionId: '',
};
}

omVal.replicationInfo.isReplica = true;
}

return async.series([
// Zenko's CRR delegates replacing the account
// information to the destination's Cloudserver, as
Expand Down
2 changes: 1 addition & 1 deletion lib/routes/utilities/pushReplicationMetric.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { pushMetric } = require('../../utapi/utilities');

function getMetricToPush(prevObjectMD, newObjectMD) {
// We only want to update metrics for a destination bucket.
if (newObjectMD.getReplicationStatus() !== 'REPLICA') {
if (!newObjectMD.getReplicationIsReplica()) {
return null;
}

Expand Down
11 changes: 4 additions & 7 deletions lib/services.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ const services = {
lastModifiedDate, versioning, versionId, uploadId,
tagging, taggingCopy, replicationInfo, defaultRetention,
dataStoreName, creationTime, retentionMode, retentionDate,
legalHold, originOp, updateMicroVersionId, archive, oldReplayId,
deleteNullKey, amzStorageClass, overheadField, needOplogUpdate,
restoredEtag, bucketOwnerId } = params;
legalHold, originOp, archive,
oldReplayId, deleteNullKey, amzStorageClass, overheadField,
needOplogUpdate, restoredEtag, bucketOwnerId } = params;
log.trace('storing object in metadata');
assert.strictEqual(typeof bucketName, 'string');
const md = new ObjectMD();
Expand Down Expand Up @@ -189,10 +189,7 @@ const services = {
md.setUploadId(uploadId);
options.replayId = uploadId;
}
// update microVersionId when overwriting metadata.
if (updateMicroVersionId) {
md.updateMicroVersionId();
}
md.updateMicroVersionId(config.instanceId, config.replicationGroupId);
// update restore
if (archive) {
md.setAmzStorageClass(amzStorageClass);
Expand Down
10 changes: 7 additions & 3 deletions lib/utilities/collectResponseHeaders.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,13 @@ function collectResponseHeaders(objectMD, corsHeaders, versioningCfg,
responseMetaHeaders['x-amz-object-lock-legal-hold']
= objectMD.legalHold ? 'ON' : 'OFF';
}
if (objectMD.replicationInfo && objectMD.replicationInfo.status) {
responseMetaHeaders['x-amz-replication-status'] =
objectMD.replicationInfo.status;
const replInfo = objectMD.replicationInfo;
if (replInfo) {
if (replInfo.isReplica === true || replInfo.status === 'REPLICA') {
responseMetaHeaders['x-amz-replication-status'] = 'REPLICA';
} else if (replInfo.status) {
responseMetaHeaders['x-amz-replication-status'] = replInfo.status;
}
}
if (Array.isArray(objectMD?.replicationInfo?.backends)) {
objectMD.replicationInfo.backends.forEach(backend => {
Expand Down
Loading
Loading