* Revert "refactor(server/upload): convert js to ts (#2618)"

This reverts commit e9f7286f67.

* Revert "feat(logging): pass request logger to core/services/objects (#2599)"

This reverts commit ee3e9af78d.

* Revert "feat(server): configurable maximum objects POST size and improved logging (#2594)"

This reverts commit 55cad9662a.
This commit is contained in:
Iain Sproat 2024-08-12 10:26:53 +01:00 коммит произвёл GitHub
Родитель fc079b2b71
Коммит f2c5677b4a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
23 изменённых файлов: 377 добавлений и 584 удалений

Просмотреть файл

@ -31,7 +31,7 @@ module.exports = class ServerAPI {
obj.id = crypto.createHash('md5').update(JSON.stringify(obj)).digest('hex')
}
await this.createObject({ streamId: this.streamId, object: obj })
await this.createObject(this.streamId, obj)
return obj.id
}
@ -40,7 +40,7 @@ module.exports = class ServerAPI {
return await this.createObjectsBatched(this.streamId, objs)
}
async createObject({ streamId, object }) {
async createObject(streamId, object) {
const insertionObject = this.prepInsertionObject(streamId, object)
const closures = []

Просмотреть файл

@ -129,7 +129,7 @@ describe('Activity @activity', () => {
streamSecret.id = resStream1.body.data.streamCreate
// create commit (cr2)
testObj2.id = await createObject({ streamId: streamSecret.id, object: testObj2 })
testObj2.id = await createObject(streamSecret.id, testObj2)
const resCommit1 = await sendRequest(userCr.token, {
query: `mutation { commitCreate(commit: {streamId: "${streamSecret.id}", branchName: "main", objectId: "${testObj2.id}", message: "first commit"})}`
})
@ -152,7 +152,7 @@ describe('Activity @activity', () => {
branchPublic.id = resBranch.body.data.branchCreate
// create commit #2 (iz3)
testObj.id = await createObject({ streamId: streamPublic.id, object: testObj })
testObj.id = await createObject(streamPublic.id, testObj)
const resCommit2 = await sendRequest(userIz.token, {
query: `mutation { commitCreate(commit: { streamId: "${streamPublic.id}", branchName: "${branchPublic.name}", objectId: "${testObj.id}", message: "first commit" })}`
})

Просмотреть файл

@ -286,7 +286,7 @@ const queryComments = async ({ apollo, resources, shouldSucceed }) => {
bar: crs({ length: 5 })
}
const objectId = await createObject({ streamId: resources.streamId, object })
const objectId = await createObject(resources.streamId, object)
const numberOfComments = 3
const commentIds = await Promise.all(
@ -361,12 +361,9 @@ const queryStreamCommentCount = async ({ apollo, resources, shouldSucceed }) =>
}
const queryObjectCommentCount = async ({ apollo, resources, shouldSucceed }) => {
const objectId = await createObject({
streamId: resources.streamId,
object: {
foo: 'bar',
noise: crs({ length: 5 })
}
const objectId = await createObject(resources.streamId, {
foo: 'bar',
noise: crs({ length: 5 })
})
await createComment({
userId: resources.testActorId,
@ -397,12 +394,9 @@ const queryObjectCommentCount = async ({ apollo, resources, shouldSucceed }) =>
}
const queryCommitCommentCount = async ({ apollo, resources, shouldSucceed }) => {
const objectId = await createObject({
streamId: resources.streamId,
object: {
foo: 'bar',
notSignal: crs({ length: 10 })
}
const objectId = await createObject(resources.streamId, {
foo: 'bar',
notSignal: crs({ length: 10 })
})
const commitId = await createCommitByBranchName({
streamId: resources.streamId,
@ -444,12 +438,9 @@ const queryCommitCollectionCommentCount = async ({
resources,
shouldSucceed
}) => {
const objectId = await createObject({
streamId: resources.streamId,
object: {
foo: 'bar',
almostMakesSense: crs({ length: 10 })
}
const objectId = await createObject(resources.streamId, {
foo: 'bar',
almostMakesSense: crs({ length: 10 })
})
const commitId = await createCommitByBranchName({
streamId: resources.streamId,
@ -852,10 +843,7 @@ describe('Graphql @comments', () => {
})
}
const objectId = await createObject({
streamId: stream.id,
object: { test: 'object' }
})
const objectId = await createObject(stream.id, { test: 'object' })
const { id: commentId } = await createComment({
userId: myTestActor.id,

Просмотреть файл

@ -105,8 +105,8 @@ describe('Comments @comments', () => {
stream.id = await createStream({ ...stream, ownerId: user.id })
testObject1.id = await createObject({ streamId: stream.id, object: testObject1 })
testObject2.id = await createObject({ streamId: stream.id, object: testObject2 })
testObject1.id = await createObject(stream.id, testObject1)
testObject2.id = await createObject(stream.id, testObject2)
commitId1 = await createCommitByBranchName({
streamId: stream.id,
@ -163,7 +163,7 @@ describe('Comments @comments', () => {
const streamA = { name: 'Stream A' }
streamA.id = await createStream({ ...streamA, ownerId: user.id })
const objA = { foo: 'bar' }
objA.id = await createObject({ streamId: streamA.id, object: objA })
objA.id = await createObject(streamA.id, objA)
const commA = {}
commA.id = await createCommitByBranchName({
streamId: streamA.id,
@ -177,7 +177,7 @@ describe('Comments @comments', () => {
const streamB = { name: 'Stream B' }
streamB.id = await createStream({ ...streamB, ownerId: otherUser.id })
const objB = { qux: 'mux' }
objB.id = await createObject({ streamId: streamB.id, object: objB })
objB.id = await createObject(streamB.id, objB)
const commB = {}
commB.id = await createCommitByBranchName({
streamId: streamB.id,
@ -267,7 +267,7 @@ describe('Comments @comments', () => {
const stream = { name: 'Bean Counter' }
stream.id = await createStream({ ...stream, ownerId: user.id })
const obj = { foo: 'bar' }
obj.id = await createObject({ streamId: stream.id, object: obj })
obj.id = await createObject(stream.id, obj)
const commit = {}
commit.id = await createCommitByBranchName({
streamId: stream.id,
@ -358,7 +358,7 @@ describe('Comments @comments', () => {
const streamOther = { name: 'Bean Counter' }
streamOther.id = await createStream({ ...streamOther, ownerId: user.id })
const objOther = { 'are you bored': 'yes' }
objOther.id = await createObject({ streamId: streamOther.id, object: objOther })
objOther.id = await createObject(streamOther.id, objOther)
const commitOther = {}
commitOther.id = await createCommitByBranchName({
streamId: streamOther.id,
@ -560,10 +560,7 @@ describe('Comments @comments', () => {
})
it('Should not return the same comment multiple times for multi resource comments', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: { testObject: 1 }
})
const localObjectId = await createObject(stream.id, { testObject: 1 })
const commentCount = 3
for (let i = 0; i < commentCount; i++) {
@ -603,11 +600,8 @@ describe('Comments @comments', () => {
})
it('Should handle cursor and limit for queries', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: {
testObject: 'something completely different'
}
const localObjectId = await createObject(stream.id, {
testObject: 'something completely different'
})
const createdComments = []
@ -697,10 +691,7 @@ describe('Comments @comments', () => {
})
it('Should return all the referenced resources for a comment', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: { anotherTestObject: 1 }
})
const localObjectId = await createObject(stream.id, { anotherTestObject: 1 })
const inputResources = [
{ resourceId: stream.id, resourceType: 'stream' },
{ resourceId: commitId1, resourceType: 'commit' },
@ -731,10 +722,7 @@ describe('Comments @comments', () => {
})
it('Should return the same data when querying a single comment vs a list of comments', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: { anotherTestObject: 42 }
})
const localObjectId = await createObject(stream.id, { anotherTestObject: 42 })
await createComment({
userId: user.id,
input: {
@ -765,11 +753,8 @@ describe('Comments @comments', () => {
})
it('Should be able to edit a comment text', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: {
anotherTestObject: crs({ length: 10 })
}
const localObjectId = await createObject(stream.id, {
anotherTestObject: crs({ length: 10 })
})
const { id: commentId } = await createComment({
userId: user.id,
@ -804,11 +789,8 @@ describe('Comments @comments', () => {
})
it('Should not be allowed to edit a comment of another user if its restricted', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: {
anotherTestObject: crs({ length: 10 })
}
const localObjectId = await createObject(stream.id, {
anotherTestObject: crs({ length: 10 })
})
const { id: commentId } = await createComment({
userId: user.id,
@ -926,11 +908,8 @@ describe('Comments @comments', () => {
})
it('Should not query archived comments unless asked', async () => {
const localObjectId = await createObject({
streamId: stream.id,
object: {
testObject: crs({ length: 10 })
}
const localObjectId = await createObject(stream.id, {
testObject: crs({ length: 10 })
})
const commentCount = 15

Просмотреть файл

@ -66,10 +66,10 @@ export = {
context.resourceAccessRules
)
const ids = await createObjects({
streamId: args.objectInput.streamId,
objects: args.objectInput.objects
})
const ids = await createObjects(
args.objectInput.streamId,
args.objectInput.objects
)
return ids
}
}

Просмотреть файл

@ -0,0 +1,294 @@
'use strict'
const zlib = require('zlib')
const { corsMiddleware } = require('@/modules/core/configs/cors')
const Busboy = require('busboy')
const { validatePermissionsWriteStream } = require('./authUtils')
const { getFeatureFlags } = require('@/modules/shared/helpers/envHelper')
const {
createObjectsBatched,
createObjectsBatchedAndNoClosures
} = require('@/modules/core/services/objects')
const { ObjectHandlingError } = require('@/modules/core/errors/object')
const { estimateStringMegabyteSize } = require('@/modules/core/utils/chunking')
const MAX_FILE_SIZE = 50 * 1024 * 1024
const { FF_NO_CLOSURE_WRITES } = getFeatureFlags()
let objectInsertionService = createObjectsBatched
if (FF_NO_CLOSURE_WRITES) {
objectInsertionService = createObjectsBatchedAndNoClosures
}
module.exports = (app) => {
app.options('/objects/:streamId', corsMiddleware())
app.post('/objects/:streamId', corsMiddleware(), async (req, res) => {
req.log = req.log.child({
userId: req.context.userId || '-',
streamId: req.params.streamId
})
const hasStreamAccess = await validatePermissionsWriteStream(
req.params.streamId,
req
)
if (!hasStreamAccess.result) {
return res.status(hasStreamAccess.status).end()
}
let busboy
try {
busboy = Busboy({ headers: req.headers })
} catch (e) {
req.log.warn(
e,
'Failed to parse request headers and body content as valid multipart/form-data.'
)
return res
.status(400)
.send(
'Failed to parse request headers and body content as valid multipart/form-data.'
)
}
let totalProcessed = 0
// let last = {}
const promises = []
let requestDropped = false
busboy.on('file', (name, file, info) => {
const { mimeType } = info
if (requestDropped) return
if (mimeType === 'application/gzip') {
const buffer = []
file.on('data', (data) => {
if (data) buffer.push(data)
})
file.on('end', async () => {
req.log.info(
`File upload of the multipart form has reached an end of file (EOF) boundary. The mimetype of the file is '${mimeType}'.`
)
if (requestDropped) return
const t0 = Date.now()
let objs = []
const gzippedBuffer = Buffer.concat(buffer)
if (gzippedBuffer.length > MAX_FILE_SIZE) {
req.log.error(
`Upload error: Batch size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
)
if (!requestDropped)
res
.status(400)
.send(
`File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString()
const gunzippedBufferMegabyteSize =
estimateStringMegabyteSize(gunzippedBuffer)
if (gunzippedBufferMegabyteSize > MAX_FILE_SIZE) {
req.log.error(
`upload error: batch size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})`
)
if (!requestDropped)
res
.status(400)
.send(
`File size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
try {
objs = JSON.parse(gunzippedBuffer)
} catch {
req.log.error(`Upload error: Batch not in JSON format`)
if (!requestDropped) res.status(400).send('Failed to parse data.')
requestDropped = true
}
// last = objs[objs.length - 1]
totalProcessed += objs.length
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService(req.params.streamId, objs).catch(
(e) => {
req.log.error(e, `Upload error.`)
if (!requestDropped) {
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database: ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
}
requestDropped = true
}
)
promises.push(promise)
await promise
req.log.info(
{
objectCount: objs.length,
durationSeconds: (Date.now() - t0) / 1000,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
uploadedSizeMB: gunzippedBuffer.length / 1000000,
requestDropped
},
'Uploaded batch of {objectCount} objects'
)
})
} else if (
mimeType === 'text/plain' ||
mimeType === 'application/json' ||
mimeType === 'application/octet-stream'
) {
let buffer = ''
file.on('data', (data) => {
if (data) buffer += data
})
file.on('end', async () => {
if (requestDropped) return
const t0 = Date.now()
let objs = []
if (buffer.length > MAX_FILE_SIZE) {
req.log.error(
`Upload error: Batch size too large (${buffer.length} > ${MAX_FILE_SIZE})`
)
if (!requestDropped)
res
.status(400)
.send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`)
requestDropped = true
}
try {
objs = JSON.parse(buffer)
} catch {
req.log.error(`Upload error: Batch not in JSON format`)
if (!requestDropped)
res.status(400).send('Failed to parse data. Batch is not in JSON format.')
requestDropped = true
}
if (!Array.isArray(objs)) {
req.log.error(`Upload error: Batch not an array`)
if (!requestDropped)
res
.status(400)
.send(
'Failed to parse data. Batch is expected to be wrapped in a JSON array.'
)
requestDropped = true
}
//FIXME should we exit here if requestDropped is true
totalProcessed += objs.length
req.log.debug(
`total objects, including current pending batch, processed so far is ${totalProcessed}`
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService(req.params.streamId, objs).catch(
(e) => {
req.log.error(e, `Upload error.`)
if (!requestDropped)
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database. ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
requestDropped = true
}
)
promises.push(promise)
await promise
req.log.info(
{
objectCount: objs.length,
uploadedSizeMB: estimateStringMegabyteSize(buffer),
durationSeconds: (Date.now() - t0) / 1000,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
requestDropped
},
'Uploaded batch of {objectCount} objects.'
)
})
} else {
req.log.info(`Invalid ContentType header: ${mimeType}`)
if (!requestDropped)
res
.status(400)
.send(
'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".'
)
requestDropped = true
}
})
busboy.on('finish', async () => {
if (requestDropped) return
req.log.info(
{
totalProcessed,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
},
'Upload finished: {totalProcessed} objects'
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
res.status(201).end()
})
busboy.on('error', async (err) => {
req.log.info(`Upload error: ${err}`)
if (!requestDropped)
res.status(400).end('Upload request error. The server logs have more details')
requestDropped = true
})
req.pipe(busboy)
})
}

Просмотреть файл

@ -1,416 +0,0 @@
import zlib from 'zlib'
import { corsMiddleware } from '@/modules/core/configs/cors'
import Busboy from 'busboy'
import { validatePermissionsWriteStream } from '@/modules/core/rest/authUtils'
import {
getFeatureFlags,
maximumObjectUploadFileSizeMb
} from '@/modules/shared/helpers/envHelper'
import {
createObjectsBatched,
createObjectsBatchedAndNoClosures
} from '@/modules/core/services/objects'
import { ObjectHandlingError } from '@/modules/core/errors/object'
import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking'
import { toMegabytesWith1DecimalPlace } from '@/modules/core/utils/formatting'
import { Logger } from 'pino'
import { Router } from 'express'
const MAX_FILE_SIZE = maximumObjectUploadFileSizeMb() * 1024 * 1024
const { FF_NO_CLOSURE_WRITES } = getFeatureFlags()
let objectInsertionService: (params: {
streamId: string
objects: unknown[]
logger?: Logger
}) => Promise<boolean | string[]> = createObjectsBatched
if (FF_NO_CLOSURE_WRITES) {
objectInsertionService = createObjectsBatchedAndNoClosures
}
export default (app: Router) => {
app.options('/objects/:streamId', corsMiddleware())
app.post('/objects/:streamId', corsMiddleware(), async (req, res) => {
const calculateLogMetadata = (params: {
batchSizeMb: number
start: number
batchStartTime: number
totalObjectsProcessed: number
}) => {
return {
batchSizeMb: params.batchSizeMb,
maxFileSizeMb: toMegabytesWith1DecimalPlace(MAX_FILE_SIZE),
elapsedTimeMs: Date.now() - params.start,
batchElapsedTimeMs: Date.now() - params.batchStartTime,
totalObjectsProcessed: params.totalObjectsProcessed
}
}
req.log = req.log.child({
userId: req.context.userId || '-',
streamId: req.params.streamId
})
const start = Date.now()
const hasStreamAccess = await validatePermissionsWriteStream(
req.params.streamId,
req
)
if (!hasStreamAccess.result) {
return res.status(hasStreamAccess.status).end()
}
let busboy
try {
busboy = Busboy({ headers: req.headers })
} catch (e) {
req.log.warn(
e,
'Failed to parse request headers and body content as valid multipart/form-data.'
)
return res
.status(400)
.send(
'Failed to parse request headers and body content as valid multipart/form-data.'
)
}
let totalObjectsProcessed = 0
const promises: Promise<boolean | void | string[]>[] = []
let requestDropped = false
busboy.on('file', (name, file, info) => {
const { mimeType } = info
if (requestDropped) return
if (mimeType === 'application/gzip') {
const buffer: Uint8Array[] = []
file.on('data', (data) => {
if (data) buffer.push(data)
})
file.on('end', async () => {
req.log.info(
`File upload of the multipart form has reached an end of file (EOF) boundary. The mimetype of the file is '${mimeType}'.`
)
if (requestDropped) return
const batchStartTime = Date.now()
let objs = []
const gzippedBuffer = Buffer.concat(buffer)
if (gzippedBuffer.length > MAX_FILE_SIZE) {
req.log.error(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(gzippedBuffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
`File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString()
const gunzippedBufferMegabyteSize =
estimateStringMegabyteSize(gunzippedBuffer)
if (gunzippedBufferMegabyteSize > MAX_FILE_SIZE) {
req.log.error(
calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
`File size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
try {
objs = JSON.parse(gunzippedBuffer)
} catch {
req.log.error(
calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch of objects took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped) res.status(400).send('Failed to parse data.')
requestDropped = true
}
// last = objs[objs.length - 1]
totalObjectsProcessed += objs.length
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService({
streamId: req.params.streamId,
objects: objs,
logger: req.log
}).catch((e) => {
req.log.error(
{
...calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length,
err: e
},
`Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.`
)
if (!requestDropped) {
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database: ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
}
requestDropped = true
})
promises.push(promise)
await promise
req.log.info(
{
objectCount: objs.length,
elapsedTimeMs: Date.now() - start,
batchElapsedTimeMs: Date.now() - batchStartTime,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
uploadedSizeMB: toMegabytesWith1DecimalPlace(gunzippedBuffer.length),
requestDropped,
totalObjectsProcessed
},
'Uploaded batch of {objectCount} objects in {batchElapsedTimeMs}ms. Total objects processed so far: {totalObjectsProcessed} in a total of {elapsedTimeMs}ms.'
)
})
} else if (
mimeType === 'text/plain' ||
mimeType === 'application/json' ||
mimeType === 'application/octet-stream'
) {
let buffer = ''
file.on('data', (data) => {
if (data) buffer += data
})
file.on('end', async () => {
if (requestDropped) return
const batchStartTime = Date.now()
let objs = []
if (buffer.length > MAX_FILE_SIZE) {
req.log.error(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`)
requestDropped = true
}
try {
objs = JSON.parse(buffer)
} catch {
req.log.error(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res.status(400).send('Failed to parse data. Batch is not in JSON format.')
requestDropped = true
}
if (!Array.isArray(objs)) {
req.log.error(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch not an array. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
'Failed to parse data. Batch is expected to be wrapped in a JSON array.'
)
requestDropped = true
}
//FIXME should we exit here if requestDropped is true
totalObjectsProcessed += objs.length
req.log.debug(
{
...calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length
},
'Total objects, including current pending batch of {objectCount} objects, processed so far is {totalObjectsProcessed}. This batch has taken {batchElapsedTimeMs}ms. Total time elapsed is {elapsedTimeMs}ms.'
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService({
streamId: req.params.streamId,
objects: objs,
logger: req.log
}).catch((e) => {
req.log.error(
{
...calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
err: e
},
`Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.`
)
if (!requestDropped)
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database. ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
requestDropped = true
})
promises.push(promise)
await promise
req.log.info(
{
...calculateLogMetadata({
batchSizeMb: estimateStringMegabyteSize(buffer),
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
},
'Uploaded batch of {objectCount} objects. Total number of objects processed is {totalObjectsProcessed}. This batch took {batchElapsedTimeMs}ms.'
)
})
} else {
req.log.info(
{
mimeType,
totalObjectsProcessed
},
'Invalid ContentType header: {mimeType}. Total number of objects processed so far: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".'
)
requestDropped = true
}
})
busboy.on('finish', async () => {
if (requestDropped) return
req.log.info(
{
totalObjectsProcessed,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
elapsedTimeMs: Date.now() - start
},
'Upload finished: {totalObjectsProcessed} objects processed in {elapsedTimeMs}ms'
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
res.status(201).end()
})
busboy.on('error', async (err) => {
req.log.info(
{
err,
totalObjectsProcessed,
elapsedTimeMs: Date.now() - start,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
},
'Error during upload. Error occurred after {elapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}. Error: {error}'
)
if (!requestDropped)
res.status(400).end('Upload request error. The server logs have more details')
requestDropped = true
})
req.pipe(busboy)
})
}

Просмотреть файл

@ -17,10 +17,9 @@ const Closures = () => knex('object_children_closure')
module.exports = {
/**
* @param {{streamId, object, logger?}} params
* @returns {Promise<string>}
*/
async createObject({ streamId, object, logger = servicesLogger }) {
async createObject(streamId, object) {
const insertionObject = prepInsertionObject(streamId, object)
const closures = []
@ -58,12 +57,10 @@ module.exports = {
}
}
logger.debug({ objectId: insertionObject.id }, 'Inserted object: {objectId}')
return insertionObject.id
},
async createObjectsBatched({ streamId, objects, logger = servicesLogger }) {
async createObjectsBatched(streamId, objects) {
const closures = []
const objsToInsert = []
const ids = []
@ -115,7 +112,10 @@ module.exports = {
for (const batch of batches) {
prepInsertionObjectBatch(batch)
await Objects().insert(batch).onConflict().ignore()
logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects')
servicesLogger.info(
{ objectCount: batch.length },
'Inserted ${objectCount} objects'
)
}
}
@ -126,17 +126,16 @@ module.exports = {
for (const batch of batches) {
prepInsertionClosureBatch(batch)
await Closures().insert(batch).onConflict().ignore()
logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures')
servicesLogger.info(
{ batchLength: batch.length },
'Inserted ${batchLength} closures'
)
}
}
return true
},
async createObjectsBatchedAndNoClosures({
streamId,
objects,
logger = servicesLogger
}) {
async createObjectsBatchedAndNoClosures(streamId, objects) {
const objsToInsert = []
const ids = []
@ -160,7 +159,10 @@ module.exports = {
for (const batch of batches) {
prepInsertionObjectBatch(batch)
await Objects().insert(batch).onConflict().ignore()
logger.info({ batchLength: batch.length }, 'Inserted {batchLength} objects.')
servicesLogger.info(
{ batchLength: batch.length },
'Inserted {batchLength} objects'
)
}
}
@ -170,7 +172,7 @@ module.exports = {
/**
* @returns {Promise<string[]>}
*/
async createObjects({ streamId, objects, logger = servicesLogger }) {
async createObjects(streamId, objects) {
// TODO: Switch to knex batch inserting functionality
// see http://knexjs.org/#Utility-BatchInsert
const batches = []
@ -234,16 +236,12 @@ module.exports = {
}
const t1 = performance.now()
logger.info(
{
batchIndex: index + 1,
totalCountOfBatches: batches.length,
countStoredObjects: closures.length + objsToInsert.length,
elapsedTimeMs: t1 - t0
},
'Batch {batchIndex}/{totalCountOfBatches}: Stored {countStoredObjects} objects in {elapsedTimeMs}ms.'
servicesLogger.info(
`Batch ${index + 1}/${batches.length}: Stored ${
closures.length + objsToInsert.length
} objects in ${t1 - t0}ms.`
)
// logger.debug( `Batch ${index + 1}/${batches.length}: Stored ${closures.length + objsToInsert.length} objects in ${t1-t0}ms.` )
}
const promises = batches.map((batch, index) => insertBatch(batch, index))

Просмотреть файл

@ -47,7 +47,7 @@ describe('Branches @core-branches', () => {
user.id = await createUser(user)
stream.id = await createStream({ ...stream, ownerId: user.id })
testObject.id = await createObject({ streamId: stream.id, object: testObject })
testObject.id = await createObject(stream.id, testObject)
})
const branch = { name: 'dim/dev' }

Просмотреть файл

@ -50,7 +50,7 @@ describe('Commits @core-commits', () => {
}
const generateObject = async (streamId = stream.id, object = testObject) =>
await createObject({ streamId, object })
await createObject(streamId, object)
const generateStream = async (streamBase = stream, ownerId = user.id) =>
await createStream({ ...streamBase, ownerId })
@ -62,15 +62,9 @@ describe('Commits @core-commits', () => {
user.id = await createUser(user)
stream.id = await createStream({ ...stream, ownerId: user.id })
const testObjectId = await createObject({ streamId: stream.id, object: testObject })
const testObject2Id = await createObject({
streamId: stream.id,
object: testObject2
})
const testObject3Id = await createObject({
streamId: stream.id,
object: testObject3
})
const testObjectId = await createObject(stream.id, testObject)
const testObject2Id = await createObject(stream.id, testObject2)
const testObject3Id = await createObject(stream.id, testObject3)
commitId1 = await createCommitByBranchName({
streamId: stream.id,
@ -218,7 +212,7 @@ describe('Commits @core-commits', () => {
for (let i = 0; i < 10; i++) {
const t = { qux: i }
t.id = await createObject({ streamId, object: t })
t.id = await createObject(streamId, t)
await createCommitByBranchName({
streamId,
branchName: 'main',
@ -258,7 +252,7 @@ describe('Commits @core-commits', () => {
for (let i = 0; i < 15; i++) {
const t = { thud: i }
t.id = await createObject({ streamId, object: t })
t.id = await createObject(streamId, t)
await createCommitByBranchName({
streamId,
branchName: 'dim/dev',

Просмотреть файл

@ -63,8 +63,8 @@ describe('Objects @core-objects', () => {
})
it('Should create objects', async () => {
sampleObject.id = await createObject({ streamId: stream.id, object: sampleObject })
sampleCommit.id = await createObject({ streamId: stream.id, object: sampleCommit })
sampleObject.id = await createObject(stream.id, sampleObject)
sampleCommit.id = await createObject(stream.id, sampleCommit)
})
const objCount_1 = 10
@ -80,7 +80,7 @@ describe('Objects @core-objects', () => {
})
}
const ids = await createObjects({ streamId: stream.id, objects: objs })
const ids = await createObjects(stream.id, objs)
expect(ids).to.have.lengthOf(objCount_1)
}).timeout(30000)
@ -109,7 +109,7 @@ describe('Objects @core-objects', () => {
})
}
const myIds = await createObjects({ streamId: stream.id, objects: objs2 })
const myIds = await createObjects(stream.id, objs2)
myIds.forEach((h, i) => (objs2[i].id = h))
@ -127,7 +127,7 @@ describe('Objects @core-objects', () => {
return obj
}, {})
}
const id = await createObject({ streamId: stream.id, object: obj })
const id = await createObject(stream.id, obj)
expect(id).to.be.ok
})
@ -156,16 +156,16 @@ describe('Objects @core-objects', () => {
it('Should get object children', async () => {
const objs_1 = createManyObjects(100, 'noise__')
const ids = await createObjects({ streamId: stream.id, objects: objs_1 })
const ids = await createObjects(stream.id, objs_1)
// console.log( ids )
// console.log(ids[ 0 ])
// The below are just performance benchmarking.
// let objs_2 = createManyObjects( 20000, 'noise_2' )
// let ids2 = await createObjects( {streamId: stream.id, objects: objs_2} )
// let ids2 = await createObjects( objs_2 )
// let objs_3 = createManyObjects( 100000, 'noise_3' )
// let ids3 = await createObjects( {streamId: stream.id, objects: objs_3} )
// let ids3 = await createObjects( objs_3 )
// let { rows } = await getObjectChildren( { objectId: ids[0], select: ['id', 'name', 'sortValueB'] } )
// let { rows } = await getObjectChildren( { objectId: ids[ 0 ] } )
@ -494,7 +494,7 @@ describe('Objects @core-objects', () => {
const objs = createManyObjects(3333, 'perlin merlin magic')
commitId = objs[0].id
await createObjectsBatched({ streamId: stream.id, objects: objs })
await createObjectsBatched(stream.id, objs)
const parent = await getObject({ streamId: stream.id, objectId: commitId })
expect(parent.totalChildrenCount).to.equal(3333)
@ -539,10 +539,7 @@ describe('Objects @core-objects', () => {
const promisses = []
for (let i = 0; i < shuffledVersions.length; i++) {
const promise = createObjectsBatched({
streamId: stream.id,
objects: shuffledVersions[i]
})
const promise = createObjectsBatched(stream.id, shuffledVersions[i])
promise.catch(() => {})
promisses.push(promise)
}

Просмотреть файл

@ -341,10 +341,7 @@ describe('Streams @core-streams', () => {
it('Should update stream updatedAt on commit operations ', async () => {
const testObject = { foo: 'bar', baz: 'qux', id: '' }
testObject.id = await createObject({
streamId: updatableStream.id,
object: testObject
})
testObject.id = await createObject(updatableStream.id, testObject)
await createCommitByBranchName({
streamId: updatableStream.id,

Просмотреть файл

@ -141,10 +141,7 @@ describe('Actors & Tokens @user-services', () => {
})
// create an object and a commit around it on the multiowner stream
const objId = await createObject({
streamId: multiOwnerStream.id,
object: { pie: 'in the sky' }
})
const objId = await createObject(multiOwnerStream.id, { pie: 'in the sky' })
const commitId = await createCommitByBranchName({
streamId: multiOwnerStream.id,
branchName: 'ballmer/dev',

Просмотреть файл

@ -1,2 +0,0 @@
export const toMegabytesWith1DecimalPlace = (bytes: number) =>
Math.round((bytes * 10) / 1024 / 1024) / 10

Просмотреть файл

@ -495,13 +495,10 @@ const createNewObject = async (
return
}
const newObjectId = await createObject({
streamId: targetStreamId,
object: {
...newObject,
id: newObject.id,
speckleType: newObject.speckleType || newObject.speckle_type || 'Base'
}
const newObjectId = await createObject(targetStreamId, {
...newObject,
id: newObject.id,
speckleType: newObject.speckleType || newObject.speckle_type || 'Base'
})
const newRecord = await getObject(newObjectId, targetStreamId)

Просмотреть файл

@ -405,7 +405,3 @@ export function postgresMaxConnections() {
export function highFrequencyMetricsCollectionPeriodMs() {
return getIntFromEnv('HIGH_FREQUENCY_METRICS_COLLECTION_PERIOD_MS', '100')
}
export function maximumObjectUploadFileSizeMb() {
return getIntFromEnv('MAX_OBJECT_UPLOAD_FILE_SIZE_MB', '50')
}

Просмотреть файл

@ -230,10 +230,7 @@ async function seedDb({
const streamIds = await Promise.all(streamPromises)
// create a objects
const objs = await createObjects({
streamId: streamIds[0],
objects: createManyObjects(numObjects - 1)
})
const objs = await createObjects(streamIds[0], createManyObjects(numObjects - 1))
// create commits referencing those objects
const commitPromises = []

Просмотреть файл

@ -125,7 +125,6 @@
"@tiptap/core": "^2.0.0-beta.176",
"@types/bcrypt": "^5.0.0",
"@types/bull": "^3.15.9",
"@types/busboy": "^1.5.4",
"@types/chai-as-promised": "^7.1.8",
"@types/compression": "^1.7.2",
"@types/connect-redis": "^0.0.23",

Просмотреть файл

@ -36,7 +36,7 @@ export type BasicTestCommit = {
}
export async function createTestObject(params: { projectId: string }) {
return await createObject({ streamId: params.projectId, object: { foo: 'bar' } })
return await createObject(params.projectId, { foo: 'bar' })
}
/**
@ -46,9 +46,7 @@ async function ensureObjects(commits: BasicTestCommit[]) {
const commitsWithoutObjects = commits.filter((c) => !c.objectId)
await Promise.all(
commitsWithoutObjects.map((c) =>
createObject({ streamId: c.streamId, object: { foo: 'bar' } }).then(
(oid) => (c.objectId = oid)
)
createObject(c.streamId, { foo: 'bar' }).then((oid) => (c.objectId = oid))
)
)
}

Просмотреть файл

@ -594,9 +594,6 @@ Generate the environment variables for Speckle server and Speckle objects deploy
- name: MAX_OBJECT_SIZE_MB
value: {{ .Values.server.max_object_size_mb | quote }}
- name: MAX_OBJECT_UPLOAD_FILE_SIZE_MB
value: {{ .Values.server.max_object_upload_file_size_mb | quote }}
{{- if .Values.server.migration.movedFrom }}
- name: MIGRATION_SERVER_MOVED_FROM
value: {{ .Values.server.migration.movedFrom }}

Просмотреть файл

@ -551,11 +551,6 @@
"description": "The maximum size of an individual object which can be uploaded to the server",
"default": 10
},
"max_object_upload_file_size_mb": {
"type": "number",
"description": "Objects are batched together and uploaded to the /objects endpoint as http POST form data. This determines the maximum size of that form data which can be uploaded to the server",
"default": 50
},
"max_project_models_per_page": {
"type": "number",
"description": "The maximum number of models that can be returned in a single page of a query for all models of a project",

Просмотреть файл

@ -423,8 +423,6 @@ server:
## @param server.max_object_size_mb The maximum size of an individual object which can be uploaded to the server
max_object_size_mb: 10
## @param server.max_object_upload_file_size_mb Objects are batched together and uploaded to the /objects endpoint as http POST form data. This determines the maximum size of that form data which can be uploaded to the server
max_object_upload_file_size_mb: 50
## @param server.max_project_models_per_page The maximum number of models that can be returned in a single page of a query for all models of a project
max_project_models_per_page: 500
## @param server.speckleAutomateUrl The url of the Speckle Automate instance

Просмотреть файл

@ -15398,7 +15398,6 @@ __metadata:
"@tiptap/core": "npm:^2.0.0-beta.176"
"@types/bcrypt": "npm:^5.0.0"
"@types/bull": "npm:^3.15.9"
"@types/busboy": "npm:^1.5.4"
"@types/chai-as-promised": "npm:^7.1.8"
"@types/compression": "npm:^1.7.2"
"@types/connect-redis": "npm:^0.0.23"
@ -17891,15 +17890,6 @@ __metadata:
languageName: node
linkType: hard
"@types/busboy@npm:^1.5.4":
version: 1.5.4
resolution: "@types/busboy@npm:1.5.4"
dependencies:
"@types/node": "npm:*"
checksum: 10/43cdd26754603fbee81f538ac52769f2cc8445d5f238666845d99a9fee22e0b608a075d0c346f78c43ade4ce4ec04433a51a1ffa21524ca29ead9d2375f4ec9c
languageName: node
linkType: hard
"@types/cacheable-request@npm:^6.0.1":
version: 6.0.3
resolution: "@types/cacheable-request@npm:6.0.3"