* feat(webhook-service): learn to speak multi region

* feat(fileimport-service): talk multi region to me

* feat(fileuploads, blobs): multi region

* feat(fileimport-service): multi region fixes

* feat(branchesAndCommits): multi region resolvers

* fix(fileimports): no need for ts ignore

* fix(fileimports): fix pr comments
This commit is contained in:
Gergő Jedlicska 2024-11-13 14:20:25 +01:00 коммит произвёл GitHub
Родитель 8c21f1e8af
Коммит 68b8341945
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
16 изменённых файлов: 414 добавлений и 271 удалений

Просмотреть файл

@ -5,11 +5,13 @@
"version": "0.2.0",
"configurations": [
{
"name": "Launch via NPM",
"name": "Launch via Yarn",
"request": "launch",
"runtimeArgs": ["run-script", "dev"],
"runtimeExecutable": "npm",
"console": "integratedTerminal",
"runtimeArgs": ["dev"],
"runtimeExecutable": "yarn",
"skipFiles": ["<node_internals>/**"],
"envFile": "${workspaceFolder}/.env",
"type": "node"
}
]

Просмотреть файл

@ -5,12 +5,13 @@ const TMP_RESULTS_PATH = '/tmp/import_result.json'
const { parseAndCreateCommitFactory } = require('./index')
const Observability = require('@speckle/shared/dist/commonjs/observability/index.js')
const knex = require('../knex')
const getDbClients = require('../knex')
async function main() {
const cmdArgs = process.argv.slice(2)
const [filePath, userId, streamId, branchName, commitMessage, fileId] = cmdArgs
const [filePath, userId, streamId, branchName, commitMessage, fileId, regionName] =
cmdArgs
const logger = Observability.extendLoggerComponent(
parentLogger.child({ streamId, branchName, userId, fileId, filePath }),
'ifc'
@ -35,6 +36,8 @@ async function main() {
error: 'Unknown error'
}
const dbClients = await getDbClients()
const knex = dbClients[regionName].public
try {
const commitId = await parseAndCreateCommitFactory({ db: knex })(ifcInput)
output = {

Просмотреть файл

@ -1,18 +1,52 @@
/* eslint-disable camelcase */
'use strict'
module.exports = require('knex')({
client: 'pg',
connection: {
application_name: 'speckle_fileimport_service',
connectionString:
process.env.PG_CONNECTION_STRING || 'postgres://speckle:speckle@127.0.0.1/speckle'
},
pool: {
min: 0,
max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE) || 1,
acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts
createTimeoutMillis: 5000
const Environment = require('@speckle/shared/dist/commonjs/environment/index.js')
const {
loadMultiRegionsConfig,
configureKnexClient
} = require('@speckle/shared/dist/commonjs/environment/multiRegionConfig.js')
const { logger } = require('./observability/logging')
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
const isDevEnv = process.env.NODE_ENV !== 'production'
let dbClients
const getDbClients = async () => {
if (dbClients) return dbClients
const maxConnections =
parseInt(process.env.POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE) || 1
const configArgs = {
migrationDirs: [],
isTestEnv: isDevEnv,
isDevOrTestEnv: isDevEnv,
logger,
maxConnections,
applicationName: 'speckle_fileimport_service'
}
// migrations are in managed in the server package
})
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
const mainClient = configureKnexClient(
{
postgres: {
connectionUri:
process.env.PG_CONNECTION_STRING ||
'postgres://speckle:speckle@127.0.0.1/speckle'
}
},
configArgs
)
dbClients = { main: mainClient }
} else {
const configPath = process.env.MULTI_REGION_CONFIG_PATH || 'multiregion.json'
const config = await loadMultiRegionsConfig({ path: configPath })
const clients = [['main', configureKnexClient(config.main, configArgs)]]
Object.entries(config.regions).map(([key, config]) => {
clients.push([key, configureKnexClient(config, configArgs)])
})
dbClients = Object.fromEntries(clients)
}
return dbClients
}
module.exports = getDbClients

Просмотреть файл

@ -159,6 +159,8 @@ if __name__ == "__main__":
commit_id = import_obj()
if not commit_id:
raise Exception("Can't create commit")
if isinstance(commit_id, Exception):
raise commit_id
results = {"success": True, "commitId": commit_id}
except Exception as ex:
LOG.exception(ex)

Просмотреть файл

@ -3,10 +3,10 @@
"private": true,
"version": "2.5.4",
"description": "Parse and import files of various types into a stream",
"author": "Dimitrie Stefanescu <didimitrie@gmail.com>",
"author": "Speckle Systems <hello@speckle.systems>",
"homepage": "https://github.com/specklesystems/speckle-server#readme",
"license": "SEE LICENSE IN readme.md",
"main": "index.js",
"main": "daemon.js",
"repository": {
"type": "git",
"url": "git+https://github.com/specklesystems/speckle-server.git"

Просмотреть файл

@ -6,8 +6,7 @@ const {
metricInputFileSize,
metricOperationErrors
} = require('./prometheusMetrics')
const knex = require('../knex')
const FileUploads = () => knex('file_uploads')
const getDbClients = require('../knex')
const { downloadFile } = require('./filesApi')
const fs = require('fs')
@ -16,7 +15,7 @@ const { spawn } = require('child_process')
const ServerAPI = require('../ifc/api')
const objDependencies = require('./objDependencies')
const { logger } = require('../observability/logging')
const { Scopes } = require('@speckle/shared')
const { Scopes, wait } = require('@speckle/shared')
const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query'
@ -31,7 +30,7 @@ let TIME_LIMIT = 10 * 60 * 1000
const providedTimeLimit = parseInt(process.env.FILE_IMPORT_TIME_LIMIT_MIN)
if (providedTimeLimit) TIME_LIMIT = providedTimeLimit * 60 * 1000
async function startTask() {
async function startTask(knex) {
const { rows } = await knex.raw(`
UPDATE file_uploads
SET
@ -49,15 +48,16 @@ async function startTask() {
return rows[0]
}
async function doTask(task) {
async function doTask(mainDb, regionName, taskDb, task) {
const taskId = task.id
// Mark task as started
await knex.raw(`NOTIFY file_import_started, '${task.id}'`)
await mainDb.raw(`NOTIFY file_import_started, '${task.id}'`)
let taskLogger = logger.child({ taskId })
let tempUserToken = null
let serverApi = null
let mainServerApi = null
let taskServerApi = null
let fileTypeForMetric = 'unknown'
let fileSizeForMetric = 0
@ -67,7 +67,7 @@ async function doTask(task) {
try {
taskLogger.info("Doing task '{taskId}'.")
const info = await FileUploads().where({ id: taskId }).first()
const info = await taskDb('file_uploads').where({ id: taskId }).first()
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
@ -85,13 +85,22 @@ async function doTask(task) {
})
fs.mkdirSync(TMP_INPUT_DIR, { recursive: true })
serverApi = new ServerAPI({ db: knex, streamId: info.streamId, logger: taskLogger })
mainServerApi = new ServerAPI({
db: mainDb,
streamId: info.streamId,
logger: taskLogger
})
taskServerApi = new ServerAPI({
db: taskDb,
streamId: info.streamId,
logger: taskLogger
})
branchMetadata = {
branchName: info.branchName,
streamId: info.streamId
}
const existingBranch = await serverApi.getBranchByNameAndStreamId({
const existingBranch = await taskServerApi.getBranchByNameAndStreamId({
streamId: info.streamId,
name: info.branchName
})
@ -99,7 +108,7 @@ async function doTask(task) {
newBranchCreated = true
}
const { token } = await serverApi.createToken({
const { token } = await mainServerApi.createToken({
userId: info.userId,
name: 'temp upload token',
scopes: [Scopes.Streams.Write, Scopes.Streams.Read],
@ -126,7 +135,8 @@ async function doTask(task) {
info.streamId,
info.branchName,
`File upload: ${info.fileName}`,
info.id
info.id,
regionName
],
{
USER_TOKEN: tempUserToken
@ -185,7 +195,7 @@ async function doTask(task) {
const commitId = output.commitId
await knex.raw(
await taskDb.raw(
`
UPDATE file_uploads
SET
@ -199,7 +209,7 @@ async function doTask(task) {
)
} catch (err) {
taskLogger.error(err)
await knex.raw(
await taskDb.raw(
`
UPDATE file_uploads
SET
@ -208,12 +218,13 @@ async function doTask(task) {
"convertedMessage" = ?
WHERE "id" = ?
`,
[err.toString(), task.id]
// DB only accepts a varchar 255
[err.toString().substring(0, 254), task.id]
)
metricOperationErrors.labels(fileTypeForMetric).inc()
} finally {
const { streamId, branchName } = branchMetadata
await knex.raw(
await mainDb.raw(
`NOTIFY file_import_update, '${task.id}:::${streamId}:::${branchName}:::${
newBranchCreated ? 1 : 0
}'`
@ -226,7 +237,7 @@ async function doTask(task) {
if (fs.existsSync(TMP_RESULTS_PATH)) fs.unlinkSync(TMP_RESULTS_PATH)
if (tempUserToken) {
await serverApi.revokeTokenById(tempUserToken)
await mainServerApi.revokeTokenById(tempUserToken)
}
}
@ -305,42 +316,53 @@ function wrapLogLine(line, isErr, logger) {
logger.info({ parserLogLine: line }, 'ParserLog: {parserLogLine}')
}
async function tick() {
if (shouldExit) {
process.exit(0)
}
try {
const task = await startTask()
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
if (!task) {
setTimeout(tick, 1000)
return
const doStuff = async () => {
const dbClients = await getDbClients()
const mainDb = dbClients.main.public
const dbClientsIterator = infiniteDbClientsIterator(dbClients)
while (!shouldExit) {
const [regionName, taskDb] = dbClientsIterator.next().value
try {
const task = await startTask(taskDb)
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
if (!task) {
await wait(1000)
continue
}
await doTask(mainDb, regionName, taskDb, task)
await wait(10)
} catch (err) {
metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
await wait(5000)
}
await doTask(task)
// Check for another task very soon
setTimeout(tick, 10)
} catch (err) {
metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
setTimeout(tick, 5000)
}
}
async function main() {
logger.info('Starting FileUploads Service...')
initPrometheusMetrics()
await initPrometheusMetrics()
process.on('SIGTERM', () => {
shouldExit = true
logger.info('Shutting down...')
})
tick()
await doStuff()
process.exit(0)
}
function* infiniteDbClientsIterator(dbClients) {
let index = 0
const dbClientEntries = [...Object.entries(dbClients)]
const clientCount = dbClientEntries.length
while (true) {
// reset index
if (index === clientCount) index = 0
const [regionName, dbConnection] = dbClientEntries[index]
index++
yield [regionName, dbConnection.public]
}
}
main()

Просмотреть файл

@ -3,7 +3,7 @@
const http = require('http')
const prometheusClient = require('prom-client')
const knex = require('../knex')
const getDbClients = require('../knex')
let metricFree = null
let metricUsed = null
@ -116,11 +116,13 @@ const initDBPrometheusMetricsFactory =
}
module.exports = {
initPrometheusMetrics() {
async initPrometheusMetrics() {
if (prometheusInitialized) return
prometheusInitialized = true
initDBPrometheusMetricsFactory({ db: knex })()
const db = (await getDbClients()).main.public
initDBPrometheusMetricsFactory({ db })()
// Define the HTTP server
const server = http.createServer(async (req, res) => {

Просмотреть файл

@ -69,6 +69,8 @@ if __name__ == "__main__":
try:
commit_id = import_stl()
if isinstance(commit_id, Exception):
raise commit_id
results = {"success": True, "commitId": commit_id}
except Exception as ex:
results = {"success": False, "error": str(ex)}

Просмотреть файл

@ -1,4 +1,3 @@
import { db } from '@/db/knex'
import {
blobCollectionSummaryFactory,
getBlobMetadataCollectionFactory,
@ -13,19 +12,24 @@ import {
StreamBlobsArgs
} from '@/modules/core/graph/generated/graphql'
import { StreamGraphQLReturn } from '@/modules/core/helpers/graphTypes'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import {
BadRequestError,
NotFoundError,
ResourceMismatch
} from '@/modules/shared/errors'
const getBlobMetadata = getBlobMetadataFactory({ db })
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({ db })
const blobCollectionSummary = blobCollectionSummaryFactory({ db })
const streamBlobResolvers = {
async blobs(parent: StreamGraphQLReturn, args: StreamBlobsArgs | ProjectBlobsArgs) {
const streamId = parent.id
const projectDb = await getProjectDbClient({ projectId: parent.id })
const blobCollectionSummary = blobCollectionSummaryFactory({ db: projectDb })
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({
db: projectDb
})
const [summary, blobs] = await Promise.all([
blobCollectionSummary({
streamId,
@ -46,6 +50,8 @@ const streamBlobResolvers = {
}
},
async blob(parent: StreamGraphQLReturn, args: StreamBlobArgs | ProjectBlobArgs) {
const projectDb = await getProjectDbClient({ projectId: parent.id })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
try {
return await getBlobMetadata({
streamId: parent.id,

Просмотреть файл

@ -48,27 +48,8 @@ import { getStreamFactory } from '@/modules/core/repositories/streams'
import { Request, Response } from 'express'
import { ensureError } from '@speckle/shared'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
const getStream = getStreamFactory({ db })
const getAllStreamBlobIds = getAllStreamBlobIdsFactory({ db })
const updateBlob = updateBlobFactory({ db })
const uploadFileStream = uploadFileStreamFactory({
upsertBlob: upsertBlobFactory({ db }),
updateBlob
})
const getBlobMetadata = getBlobMetadataFactory({ db })
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({ db })
const getFileStream = getFileStreamFactory({ getBlobMetadata })
const markUploadSuccess = markUploadSuccessFactory({ getBlobMetadata, updateBlob })
const markUploadError = markUploadErrorFactory({ getBlobMetadata, updateBlob })
const markUploadOverFileSizeLimit = markUploadOverFileSizeLimitFactory({
getBlobMetadata,
updateBlob
})
const deleteBlob = fullyDeleteBlobFactory({
getBlobMetadata,
deleteBlob: deleteBlobFactory({ db })
})
import { Knex } from 'knex'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const ensureConditions = async () => {
if (process.env.DISABLE_FILE_UPLOADS) {
@ -108,25 +89,30 @@ const errorHandler: ErrorHandler = async (req, res, callback) => {
export const init: SpeckleModule['init'] = async (app) => {
await ensureConditions()
const streamWritePermissions = streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream,
getAutomationProject: getAutomationProjectFactory({ db })
})
const streamReadPermissions = streamReadPermissionsPipelineFactory({
adminOverrideEnabled,
getRoles: getRolesFactory({ db }),
getStream,
getAutomationProject: getAutomationProjectFactory({ db })
})
const createStreamWritePermissions = ({ projectDb }: { projectDb: Knex }) =>
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
})
const createStreamReadPermissions = ({ projectDb }: { projectDb: Knex }) =>
streamReadPermissionsPipelineFactory({
adminOverrideEnabled,
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
})
app.post(
'/api/stream/:streamId/blob',
authMiddlewareCreator([
...streamWritePermissions,
// todo should we add public comments upload escape hatch?
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
]),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamWritePermissions({ projectDb }),
// todo should we add public comments upload escape hatch?
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
])(req, res, next)
},
async (req, res) => {
const streamId = req.params.streamId
req.log = req.log.child({ streamId, userId: req.context.userId })
@ -144,6 +130,26 @@ export const init: SpeckleModule['init'] = async (app) => {
limits: { fileSize: getFileSizeLimit() }
})
const projectDb = await getProjectDbClient({ projectId: streamId })
const updateBlob = updateBlobFactory({ db: projectDb })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const uploadFileStream = uploadFileStreamFactory({
upsertBlob: upsertBlobFactory({ db: projectDb }),
updateBlob
})
const markUploadSuccess = markUploadSuccessFactory({
getBlobMetadata,
updateBlob
})
const markUploadError = markUploadErrorFactory({ getBlobMetadata, updateBlob })
const markUploadOverFileSizeLimit = markUploadOverFileSizeLimitFactory({
getBlobMetadata,
updateBlob
})
busboy.on('file', (formKey, file, info) => {
const { filename: fileName } = info
const fileType = fileName?.split('.')?.pop()?.toLowerCase()
@ -236,12 +242,15 @@ export const init: SpeckleModule['init'] = async (app) => {
app.post(
'/api/stream/:streamId/blob/diff',
authMiddlewareCreator([
...streamReadPermissions,
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
]),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamReadPermissions({ projectDb }),
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
])(req, res, next)
},
async (req, res) => {
if (!isArray(req.body)) {
return res
@ -249,6 +258,9 @@ export const init: SpeckleModule['init'] = async (app) => {
.json({ error: 'An array of blob IDs expected in the body.' })
}
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const getAllStreamBlobIds = getAllStreamBlobIdsFactory({ db: projectDb })
const bq = await getAllStreamBlobIds({ streamId: req.params.streamId })
const unknownBlobIds = [...req.body].filter(
(id) => bq.findIndex((bInfo) => bInfo.id === id) === -1
@ -259,14 +271,21 @@ export const init: SpeckleModule['init'] = async (app) => {
app.get(
'/api/stream/:streamId/blob/:blobId',
authMiddlewareCreator([
...streamReadPermissions,
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
]),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamReadPermissions({ projectDb }),
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
])(req, res, next)
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const getFileStream = getFileStreamFactory({ getBlobMetadata })
const { fileName } = await getBlobMetadata({
streamId: req.params.streamId,
blobId: req.params.blobId
@ -287,9 +306,22 @@ export const init: SpeckleModule['init'] = async (app) => {
app.delete(
'/api/stream/:streamId/blob/:blobId',
authMiddlewareCreator(streamWritePermissions),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(createStreamReadPermissions({ projectDb }))(
req,
res,
next
)
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const deleteBlob = fullyDeleteBlobFactory({
getBlobMetadata,
deleteBlob: deleteBlobFactory({ db: projectDb })
})
await deleteBlob({
streamId: req.params.streamId,
blobId: req.params.blobId,
@ -302,13 +334,24 @@ export const init: SpeckleModule['init'] = async (app) => {
app.get(
'/api/stream/:streamId/blobs',
authMiddlewareCreator(streamWritePermissions),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(createStreamReadPermissions({ projectDb }))(
req,
res,
next
)
},
async (req, res) => {
let fileName = req.query.fileName
if (isArray(fileName)) {
fileName = fileName[0]
}
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({
db: projectDb
})
errorHandler(req, res, async (req, res) => {
const blobMetadataCollection = await getBlobMetadataCollection({
streamId: req.params.streamId,

Просмотреть файл

@ -33,8 +33,6 @@ import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { filteredSubscribe, publish } from '@/modules/shared/utils/subscriptions'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const getUser = legacyGetUserFactory({ db })
export = {
Query: {},
Stream: {
@ -72,6 +70,7 @@ export = {
},
Branch: {
async author(parent, _args, context) {
const getUser = legacyGetUserFactory({ db })
if (parent.authorId && context.auth) return await getUser(parent.authorId)
else return null
}

Просмотреть файл

@ -76,14 +76,14 @@ import { validateStreamAccessFactory } from '@/modules/core/services/streams/acc
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { Resolvers } from '@/modules/core/graph/generated/graphql'
import { CommitGraphQLReturn } from '@/modules/core/helpers/graphTypes'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db })
const getCommitStream = getCommitStreamFactory({ db })
const getStream = getStreamFactory({ db })
const getStreams = getStreamsFactory({ db })
const deleteCommitAndNotify = deleteCommitAndNotifyFactory({
getCommit: getCommitFactory({ db }),
markCommitStreamUpdated,
markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db }),
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db }),
deleteCommit: deleteCommitFactory({ db }),
addCommitDeletedActivity: addCommitDeletedActivityFactory({
@ -92,28 +92,6 @@ const deleteCommitAndNotify = deleteCommitAndNotifyFactory({
})
})
const getObject = getObjectFactory({ db })
const createCommitByBranchId = createCommitByBranchIdFactory({
createCommit: createCommitFactory({ db }),
getObject,
getBranchById: getBranchByIdFactory({ db }),
insertStreamCommits: insertStreamCommitsFactory({ db }),
insertBranchCommits: insertBranchCommitsFactory({ db }),
markCommitStreamUpdated,
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db }),
versionsEventEmitter: VersionsEmitter.emit,
addCommitCreatedActivity: addCommitCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const createCommitByBranchName = createCommitByBranchNameFactory({
createCommitByBranchId,
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
getBranchById: getBranchByIdFactory({ db })
})
const updateCommitAndNotify = updateCommitAndNotifyFactory({
getCommit: getCommitFactory({ db }),
getStream,
@ -126,7 +104,7 @@ const updateCommitAndNotify = updateCommitAndNotifyFactory({
saveActivity: saveActivityFactory({ db }),
publish
}),
markCommitStreamUpdated,
markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db }),
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db })
})
@ -311,6 +289,7 @@ export = {
},
Mutation: {
async commitCreate(_parent, args, context) {
const projectDb = await getProjectDbClient({ projectId: args.commit.streamId })
await authorizeResolver(
context.userId,
args.commit.streamId,
@ -323,6 +302,27 @@ export = {
throw new RateLimitError(rateLimitResult)
}
const createCommitByBranchId = createCommitByBranchIdFactory({
createCommit: createCommitFactory({ db: projectDb }),
getObject: getObjectFactory({ db: projectDb }),
getBranchById: getBranchByIdFactory({ db: projectDb }),
insertStreamCommits: insertStreamCommitsFactory({ db: projectDb }),
insertBranchCommits: insertBranchCommitsFactory({ db: projectDb }),
markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db: projectDb }),
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db: projectDb }),
versionsEventEmitter: VersionsEmitter.emit,
addCommitCreatedActivity: addCommitCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const createCommitByBranchName = createCommitByBranchNameFactory({
createCommitByBranchId,
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
getBranchById: getBranchByIdFactory({ db: projectDb })
})
const { id } = await createCommitByBranchName({
...args.commit,
parents: args.commit.parents?.filter(isNonNullable),

Просмотреть файл

@ -11,30 +11,35 @@ import {
FileImportSubscriptions,
filteredSubscribe
} from '@/modules/shared/utils/subscriptions'
import { db } from '@/db/knex'
const getFileInfo = getFileInfoFactory({ db })
const getStreamFileUploads = getStreamFileUploadsFactory({ db })
const getStreamPendingModels = getStreamPendingModelsFactory({ db })
const getBranchPendingVersions = getBranchPendingVersionsFactory({ db })
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
export = {
Stream: {
async fileUploads(parent) {
return await getStreamFileUploads({ streamId: parent.id })
const projectDb = await getProjectDbClient({ projectId: parent.id })
return await getStreamFileUploadsFactory({ db: projectDb })({
streamId: parent.id
})
},
async fileUpload(_parent, args) {
return await getFileInfo({ fileId: args.id })
async fileUpload(parent, args) {
const projectDb = await getProjectDbClient({ projectId: parent.id })
return await getFileInfoFactory({ db: projectDb })({ fileId: args.id })
}
},
Project: {
async pendingImportedModels(parent, args) {
return await getStreamPendingModels(parent.id, args)
const projectDb = await getProjectDbClient({ projectId: parent.id })
return await getStreamPendingModelsFactory({ db: projectDb })(parent.id, args)
}
},
Model: {
async pendingImportedVersions(parent, args) {
return await getBranchPendingVersions(parent.streamId, parent.name, args)
const projectDb = await getProjectDbClient({ projectId: parent.streamId })
return await getBranchPendingVersionsFactory({ db: projectDb })(
parent.streamId,
parent.name,
args
)
}
},
FileUpload: {
@ -42,8 +47,10 @@ export = {
modelName: (parent) => parent.branchName,
convertedVersionId: (parent) => parent.convertedCommitId,
async model(parent, _args, ctx) {
return await ctx.loaders.streams.getStreamBranchByName
.forStream(parent.streamId)
const projectDb = await getProjectDbClient({ projectId: parent.streamId })
return await ctx.loaders
.forRegion({ db: projectDb })
.streams.getStreamBranchByName.forStream(parent.streamId)
.load(parent.branchName.toLowerCase())
}
},

Просмотреть файл

@ -3,7 +3,11 @@ import { insertNewUploadAndNotifyFactory } from '@/modules/fileuploads/services/
import request from 'request'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { moduleLogger } from '@/logging/logging'
import { listenForImportUpdatesFactory } from '@/modules/fileuploads/services/resultListener'
import {
onFileImportProcessedFactory,
onFileProcessingFactory,
parseMessagePayload
} from '@/modules/fileuploads/services/resultListener'
import {
getFileInfoFactory,
saveUploadFileFactory
@ -19,43 +23,8 @@ import { getStreamFactory } from '@/modules/core/repositories/streams'
import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { getPort } from '@/modules/shared/helpers/envHelper'
const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
saveUploadFile: saveUploadFileFactory({ db }),
publish
})
const getStream = getStreamFactory({ db })
const saveFileUploads = async ({
userId,
streamId,
branchName,
uploadResults
}: {
userId: string
streamId: string
branchName: string
uploadResults: Array<{
blobId: string
fileName: string
fileSize: number
}>
}) => {
await Promise.all(
uploadResults.map(async (upload) => {
await insertNewUploadAndNotify({
fileId: upload.blobId,
streamId,
branchName,
userId,
fileName: upload.fileName,
fileType: upload.fileName.split('.').pop()!,
fileSize: upload.fileSize
})
})
)
}
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { listenFor } from '@/modules/core/utils/dbNotificationListener'
export const init: SpeckleModule['init'] = async (app, isInitial) => {
if (process.env.DISABLE_FILE_UPLOADS) {
@ -67,13 +36,16 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => {
app.post(
'/api/file/:fileType/:streamId/:branchName?',
authMiddlewareCreator(
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream,
getAutomationProject: getAutomationProjectFactory({ db })
})
),
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
})
)(req, res, next)
},
async (req, res) => {
const branchName = req.params.branchName || 'main'
req.log = req.log.child({
@ -81,54 +53,102 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => {
userId: req.context.userId,
branchName
})
req.pipe(
//TODO refactor packages/server/modules/blobstorage/index.js to use the service pattern, and then refactor this to call the service directly from here without the http overhead
request(
// we call this same server on localhost (IPv4) to upload the blob and do not make an external call
`http://127.0.0.1:${getPort()}/api/stream/${req.params.streamId}/blob`,
async (err, response, body) => {
if (err) {
res.log.error(err, 'Error while uploading blob.')
res.status(500).send(err.message)
return
}
if (response.statusCode === 201) {
const { uploadResults } = JSON.parse(body)
await saveFileUploads({
userId: req.context.userId!,
streamId: req.params.streamId,
branchName,
uploadResults
})
} else {
res.log.error(
{
statusCode: response.statusCode,
path: `http://127.0.0.1:${getPort()}/api/stream/${
req.params.streamId
}/blob`
},
'Error while uploading file.'
)
}
res.status(response.statusCode).send(body)
}
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
saveUploadFile: saveUploadFileFactory({ db: projectDb }),
publish
})
const saveFileUploads = async ({
userId,
streamId,
branchName,
uploadResults
}: {
userId: string
streamId: string
branchName: string
uploadResults: Array<{
blobId: string
fileName: string
fileSize: number
}>
}) => {
await Promise.all(
uploadResults.map(async (upload) => {
await insertNewUploadAndNotify({
fileId: upload.blobId,
streamId,
branchName,
userId,
fileName: upload.fileName,
fileType: upload.fileName.split('.').pop()!,
fileSize: upload.fileSize
})
})
)
}
//TODO refactor packages/server/modules/blobstorage/index.js to use the service pattern, and then refactor this to call the service directly from here without the http overhead
const pipedReq = request(
// we call this same server on localhost (IPv4) to upload the blob and do not make an external call
`http://127.0.0.1:${getPort()}/api/stream/${req.params.streamId}/blob`,
async (err, response, body) => {
if (err) {
res.log.error(err, 'Error while uploading blob.')
res.status(500).send(err.message)
return
}
if (response.statusCode === 201) {
const { uploadResults } = JSON.parse(body)
await saveFileUploads({
userId: req.context.userId!,
streamId: req.params.streamId,
branchName,
uploadResults
})
} else {
res.log.error(
{
statusCode: response.statusCode,
path: `http://127.0.0.1:${getPort()}/api/stream/${
req.params.streamId
}/blob`
},
'Error while uploading file.'
)
}
res.status(response.statusCode).send(body)
}
)
req.pipe(pipedReq)
}
)
if (isInitial) {
const listenForImportUpdates = listenForImportUpdatesFactory({
getFileInfo: getFileInfoFactory({ db }),
publish,
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
addBranchCreatedActivity: addBranchCreatedActivityFactory({
listenFor('file_import_update', async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId })
await onFileImportProcessedFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
publish,
saveActivity: saveActivityFactory({ db })
})
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
addBranchCreatedActivity: addBranchCreatedActivityFactory({
publish,
saveActivity: saveActivityFactory({ db })
})
})(parsedMessage)
})
listenFor('file_import_started', async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId })
await onFileProcessingFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
publish
})(parsedMessage)
})
listenForImportUpdates()
}
}

Просмотреть файл

@ -3,19 +3,15 @@ import {
publish,
type PublishSubscription
} from '@/modules/shared/utils/subscriptions'
import { listenFor, MessageType } from '@/modules/core/utils/dbNotificationListener'
import {
ProjectFileImportUpdatedMessageType,
ProjectPendingModelsUpdatedMessageType,
ProjectPendingVersionsUpdatedMessageType
} from '@/modules/core/graph/generated/graphql'
import { trim } from 'lodash'
import { GetFileInfo } from '@/modules/fileuploads/domain/operations'
import { GetStreamBranchByName } from '@/modules/core/domain/branches/operations'
import { AddBranchCreatedActivity } from '@/modules/activitystream/domain/operations'
const branchCreatedPayloadRegexp = /^(.+):::(.+):::(.+):::(.+)$/i
type OnFileImportProcessedDeps = {
getFileInfo: GetFileInfo
getStreamBranchByName: GetStreamBranchByName
@ -23,12 +19,24 @@ type OnFileImportProcessedDeps = {
addBranchCreatedActivity: AddBranchCreatedActivity
}
const onFileImportProcessedFactory =
(deps: OnFileImportProcessedDeps) => async (msg: MessageType) => {
const [, uploadId, streamId, branchName, newBranchCreated] =
branchCreatedPayloadRegexp.exec(msg.payload) || [null, null, null]
const isNewBranch = newBranchCreated === '1'
type ParsedMessage = {
uploadId: string | null
streamId: string | null
branchName: string | null
isNewBranch: boolean
}
const branchCreatedPayloadRegexp = /^(.+):::(.+):::(.+):::(.+)$/i
export const parseMessagePayload = (payload: string): ParsedMessage => {
const [, uploadId, streamId, branchName, newBranchCreated] =
branchCreatedPayloadRegexp.exec(payload) || [null, null, null]
const isNewBranch = newBranchCreated === '1'
return { uploadId, streamId, branchName, isNewBranch }
}
export const onFileImportProcessedFactory =
(deps: OnFileImportProcessedDeps) =>
async ({ uploadId, streamId, branchName, isNewBranch }: ParsedMessage) => {
if (!uploadId || !streamId || !branchName) return
const [upload, branch] = await Promise.all([
@ -75,9 +83,10 @@ type OnFileProcessingDeps = {
publish: PublishSubscription
}
const onFileProcessingFactory =
(deps: OnFileProcessingDeps) => async (msg: MessageType) => {
const uploadId = trim(msg.payload)
export const onFileProcessingFactory =
(deps: OnFileProcessingDeps) =>
async ({ uploadId }: ParsedMessage) => {
if (!uploadId) return
const upload = await deps.getFileInfo({ fileId: uploadId })
if (!upload) return
@ -90,12 +99,3 @@ const onFileProcessingFactory =
projectId: upload.streamId
})
}
export const listenForImportUpdatesFactory =
(deps: OnFileImportProcessedDeps & OnFileProcessingDeps) => () => {
const onFileImportProcessed = onFileImportProcessedFactory(deps)
const onFileProcessing = onFileProcessingFactory(deps)
listenFor('file_import_update', onFileImportProcessed)
listenFor('file_import_started', onFileProcessing)
}

Просмотреть файл

@ -5,6 +5,7 @@ const getDbClients = require('./knex')
const fs = require('fs')
const metrics = require('./observability/prometheusMetrics')
const { logger } = require('./observability/logging')
const { wait } = require('@speckle/shared')
let shouldExit = false
const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query'
@ -131,7 +132,7 @@ const doStuff = async (dbClients) => {
)
).filter((t) => t)
if (!tasks.length) {
await new Promise((r) => setTimeout(r, 1000))
await wait(1000)
continue
}
@ -150,7 +151,6 @@ const doStuff = async (dbClients) => {
})
)
}
process.exit(0)
}
async function main() {
@ -165,6 +165,7 @@ async function main() {
const dbClients = Object.values(await getDbClients()).map((client) => client.public)
await doStuff(dbClients)
process.exit(0)
}
main()