feat(event-broker): wire up events from auth-server through event-broker to webhooks in local dev

This commit is contained in:
Danny Coates 2021-12-15 16:00:50 -08:00
Родитель 747d36fa99
Коммит dfb1cd9517
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4C442633C62E00CB
8 изменённых файлов: 73 добавлений и 15 удалений

28
_dev/goaws/goaws.yaml Normal file
Просмотреть файл

@ -0,0 +1,28 @@
Local: # Environment name that can be passed on the command line
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
Host: localhost # hostname of the goaws system (for docker-compose this is the tag name of the container)
# you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with
# yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws
Port: 4100 # port to listen on.
# SqsPort: 9324 # alterante Sqs Port
# SnsPort: 9292 # alternate Sns Port
Region: us-east-1
AccountId: "100010001000"
LogToFile: false # Log messages (true/false)
LogFile: .st/goaws_messages.log # Log filename (for message logging
EnableDuplicates: false # Enable or not deduplication based on messageDeduplicationId
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 30 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 0 # receive message max wait time
MaximumMessageSize: 262144 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: serviceNotifications # Queue name
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
Topics: # List of topic to create at startup
- Name: fxa-account-change-dev # Topic name - with some Subscriptions
Subscriptions: # List of Subscriptions to create for this topic (queues will be created as required)
- QueueName: serviceNotifications # Queue name
Raw: true # Raw message delivery (true/false)
RandomLatency: # Parameters for introducing random latency into message queuing
Min: 0 # Desired latency in milliseconds, if min and max are zero, no latency will be applied.
Max: 0 # Desired latency in milliseconds

Просмотреть файл

@ -1,3 +1,6 @@
#!/bin/sh -ex
docker run --rm --name goaws -p 4100:4100 pafortin/goaws
DIR=$(dirname "$0")
cd "$DIR/../_dev/goaws"
docker run --rm --name goaws -p 4100:4100 -v "$(pwd)":/conf pafortin/goaws

Просмотреть файл

@ -89,6 +89,11 @@ app.post('/api/todos/save', checkAuth, function (req, res) {
res.send(200);
});
app.post('/api/webhook', function (req, res) {
res.set('Content-Type', 'application/json');
res.send(req.body);
});
// the 'todo/get' api gets the current version of the todo list
// from the server
app.get('/api/todos/get', checkAuth, function (req, res) {

Просмотреть файл

@ -21,7 +21,7 @@
"user": "local",
"password": "local"
},
"snsTopicArn": "arn:aws:sns:local-01:000000000000:local-topic1",
"snsTopicArn": "arn:aws:sns:us-east-1:100010001000:fxa-account-change-dev",
"snsTopicEndpoint": "http://localhost:4100/",
"securityHistory": {
"ipProfiling": {

Просмотреть файл

@ -4,6 +4,9 @@
"clientUrl": "http://localhost:9000/v1/oauth/subscriptions/clients"
},
"env": "development",
"firestore": {
"enabled": true
},
"log": {
"fmt": "pretty",
"level": "debug"
@ -11,8 +14,8 @@
"openid": {
"issuer": "http://localhost:3030"
},
"firestore": {
"enabled": true
"pubsub": {
"authenticate": false
},
"serviceNotificationQueueUrl": "http://localhost:4100/serviceNotifications"
}

Просмотреть файл

@ -48,10 +48,13 @@ export class FirestoreService {
Authorization: 'Bearer owner',
},
port: 9090,
projectId: 'fx-event-broker',
projectId: 'demo-fxa',
servicePath: 'localhost',
sslCreds: grpc.credentials.createInsecure(),
});
this.db.collection(`${this.prefix}clients`).doc('dcdb5ae7add825d2').set({
webhookUrl: 'http://localhost:8080/api/webhook',
});
} else {
this.db = new Firestore(config);
}

Просмотреть файл

@ -12,7 +12,7 @@ export const GooglePubsubFactory: Provider = {
useFactory: async (config: ConfigService<AppConfig>) => {
const pubsubConfig = config.get('pubsub') as AppConfig['pubsub'];
if (pubsubConfig.audience === 'example.com') {
return new PubSub({ projectId: 'fxa-event-broker' });
return new PubSub({ projectId: 'demo-fxa' });
}
return new PubSub();
},

Просмотреть файл

@ -31,7 +31,8 @@ function extractRegionFromUrl(url: string) {
@Injectable()
export class QueueworkerService
implements OnApplicationBootstrap, OnApplicationShutdown {
implements OnApplicationBootstrap, OnApplicationShutdown
{
private region: string;
private topicPrefix: string;
private readonly app: Consumer;
@ -119,6 +120,25 @@ export class QueueworkerService
}
}
private async publishMessage(clientId: string, json: any) {
const topicName = this.topicPrefix + clientId;
if (this.pubsub.isEmulator) {
const topics = await this.pubsub.getTopics();
if (topics && topics[0]) {
const topic = topics[0].find((t) => t.name.endsWith(topicName));
if (!topic) {
const [newTopic] = await this.pubsub.createTopic(topicName);
await newTopic.createSubscription(`sub-${clientId}`, {
pushConfig: {
pushEndpoint: `http://host.docker.internal:8093/v1/proxy/${clientId}`,
},
});
}
}
}
return this.pubsub.topic(topicName).publishMessage({ json });
}
/**
* Generic fan-out of the message to the pubsub clientId queues.
*
@ -132,14 +152,13 @@ export class QueueworkerService
this.metrics.increment('message.type', { eventType });
const clientIds = await this.firestore.fetchClientIds(message.uid);
for (const clientId of clientIds) {
const topicName = this.topicPrefix + clientId;
const messageId = await this.pubsub.topic(topicName).publishJSON({
const messageId = await this.publishMessage(clientId, {
changeTime: message.timestamp ? message.timestamp : message.ts * 1000,
event: message.event,
timestamp: Date.now(),
uid: message.uid,
});
this.log.debug('publishedMessage', { topicName, messageId });
this.log.debug('publishedMessage', { clientId, messageId });
}
}
@ -205,7 +224,6 @@ export class QueueworkerService
const notifyClientPromises = Object.entries(notifyClientIds)
.filter(([clientId]) => clientIds.includes(clientId))
.map(async ([clientId, capabilities]) => {
const topicName = this.topicPrefix + clientId;
const rpMessage = Object.assign(
{},
{
@ -215,10 +233,8 @@ export class QueueworkerService
}
);
const messageId = await this.pubsub
.topic(topicName)
.publishJSON(rpMessage);
this.log.debug('publishedMessage', { topicName, messageId });
const messageId = await this.publishMessage(clientId, rpMessage);
this.log.debug('publishedMessage', { clientId, messageId });
});
await Promise.all(notifyClientPromises);
}