Add Event Grid Logical decoding example
This commit is contained in:
Родитель
ceb96cbe4a
Коммит
ef8110302c
|
@ -0,0 +1,25 @@
|
|||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 17
|
||||
VisualStudioVersion = 17.3.32819.101
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LogicalDecodingPublisher", "LogicalDecodingPublisher\LogicalDecodingPublisher.csproj", "{74B7B678-5FEC-402A-AB68-3319294AF740}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{74B7B678-5FEC-402A-AB68-3319294AF740}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{74B7B678-5FEC-402A-AB68-3319294AF740}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{74B7B678-5FEC-402A-AB68-3319294AF740}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{74B7B678-5FEC-402A-AB68-3319294AF740}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {7BE8FAB0-707C-462A-9687-A1D8E8C5D36D}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
|
@ -0,0 +1,9 @@
|
|||
<Application x:Class="LogicalDecodingPublisher.App"
|
||||
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
|
||||
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
|
||||
xmlns:local="clr-namespace:LogicalDecodingPublisher"
|
||||
StartupUri="MainWindow.xaml">
|
||||
<Application.Resources>
|
||||
|
||||
</Application.Resources>
|
||||
</Application>
|
|
@ -0,0 +1,17 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Configuration;
|
||||
using System.Data;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using System.Windows;
|
||||
|
||||
namespace LogicalDecodingPublisher
|
||||
{
|
||||
/// <summary>
|
||||
/// Interaction logic for App.xaml
|
||||
/// </summary>
|
||||
public partial class App : Application
|
||||
{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
using System.Windows;
|
||||
|
||||
[assembly: ThemeInfo(
|
||||
ResourceDictionaryLocation.None, //where theme specific resource dictionaries are located
|
||||
//(used if a resource is not found in the page,
|
||||
// or application resource dictionaries)
|
||||
ResourceDictionaryLocation.SourceAssembly //where the generic resource dictionary is located
|
||||
//(used if a resource is not found in the page,
|
||||
// app, or any theme specific resource dictionaries)
|
||||
)]
|
|
@ -0,0 +1,15 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>WinExe</OutputType>
|
||||
<TargetFramework>net6.0-windows</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<UseWPF>true</UseWPF>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.Messaging.EventGrid" Version="4.11.0" />
|
||||
<PackageReference Include="Npgsql" Version="6.0.6" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,61 @@
|
|||
<Window x:Class="LogicalDecodingPublisher.MainWindow"
|
||||
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
|
||||
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
|
||||
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
|
||||
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
|
||||
xmlns:local="clr-namespace:LogicalDecodingPublisher"
|
||||
mc:Ignorable="d"
|
||||
Title="Logical Decoding Publisher" Height="450" Width="800" WindowStartupLocation="CenterScreen" ResizeMode="NoResize">
|
||||
<Grid>
|
||||
<GroupBox Header="Database connection" Margin="10,10,0,217" HorizontalAlignment="Left" Width="374" VerticalAlignment="Top">
|
||||
<Grid Height="157" Margin="0,0,-2,0">
|
||||
<Grid.ColumnDefinitions>
|
||||
<ColumnDefinition Width="17*"/>
|
||||
<ColumnDefinition Width="74*"/>
|
||||
</Grid.ColumnDefinitions>
|
||||
<Label Content="Hostname:" HorizontalAlignment="Left" Margin="10,10,0,0" VerticalAlignment="Top" Grid.ColumnSpan="2"/>
|
||||
<Label Content="User:" HorizontalAlignment="Left" Margin="10,41,0,0" VerticalAlignment="Top"/>
|
||||
<Label Content="Password:" HorizontalAlignment="Left" Margin="10,72,0,0" VerticalAlignment="Top" Grid.ColumnSpan="2"/>
|
||||
<Label Content="Database:" HorizontalAlignment="Left" Margin="10,103,0,0" VerticalAlignment="Top" Grid.ColumnSpan="2"/>
|
||||
<CheckBox x:Name="requireSslCheckbox" Content="Require SSL mode" HorizontalAlignment="Left" Margin="10,134,0,0" VerticalAlignment="Top" RenderTransformOrigin="0.664,1.992" TabIndex="4" Grid.ColumnSpan="2"/>
|
||||
<TextBox x:Name="serverTxt" HorizontalAlignment="Left" Margin="17,14,0,0" VerticalAlignment="Top" Width="266" TabIndex="0" Grid.Column="1"/>
|
||||
<TextBox x:Name="userTxt" HorizontalAlignment="Left" Margin="17,45,0,0" VerticalAlignment="Top" Width="266" TabIndex="1" Grid.Column="1"/>
|
||||
<TextBox x:Name="databaseTxt" HorizontalAlignment="Left" Margin="17,107,0,0" VerticalAlignment="Top" Width="266" TabIndex="3" Grid.Column="1"/>
|
||||
<PasswordBox x:Name="passwordTxt" HorizontalAlignment="Left" Margin="17,76,0,0" VerticalAlignment="Top" Width="266" TabIndex="2" Grid.Column="1"/>
|
||||
</Grid>
|
||||
</GroupBox>
|
||||
<GroupBox Header="Logical decoding" Margin="10,217,0,0" HorizontalAlignment="Left" Width="374" VerticalAlignment="Top" Height="180">
|
||||
<Grid>
|
||||
<Label Content="Slot name:" HorizontalAlignment="Left" Margin="10,10,0,0" VerticalAlignment="Top"/>
|
||||
<TextBox x:Name="slotTxt" HorizontalAlignment="Left" Margin="122,14,0,0" VerticalAlignment="Top" Width="200" TabIndex="7"/>
|
||||
<Button x:Name="startBtn" Content="Start" HorizontalAlignment="Left" Margin="43,127,0,0" VerticalAlignment="Top" Width="60" TabIndex="9" Click="startBtn_Click"/>
|
||||
<Button x:Name="stopBtn" Content="Stop" HorizontalAlignment="Left" Margin="252,127,0,0" VerticalAlignment="Top" Width="60" IsEnabled="False" TabIndex="10" Click="stopBtn_Click"/>
|
||||
<Label Content="Plugin name:" HorizontalAlignment="Left" Margin="10,41,0,0" VerticalAlignment="Top"/>
|
||||
<ComboBox x:Name="pluginTxt" HorizontalAlignment="Left" Margin="122,45,0,0" VerticalAlignment="Top" Width="200" SelectedValuePath="Content">
|
||||
<ComboBoxItem Content="wal2json"/>
|
||||
<ComboBoxItem Content="test_decoding"/>
|
||||
<ComboBoxItem Content="pgoutput"/>
|
||||
</ComboBox>
|
||||
<Label x:Name="publicationLabel" Content="Publication name:" HorizontalAlignment="Left" Margin="10,79,0,0" VerticalAlignment="Top"/>
|
||||
<TextBox x:Name="publicationTxt" HorizontalAlignment="Left" Margin="122,83,0,0" VerticalAlignment="Top" Width="200" TabIndex="7"/>
|
||||
</Grid>
|
||||
</GroupBox>
|
||||
<GroupBox HorizontalAlignment="Left" Height="177" Header="Event Grid connection" Margin="413,10,0,0" VerticalAlignment="Top" Width="358">
|
||||
<Grid>
|
||||
<Grid.ColumnDefinitions>
|
||||
<ColumnDefinition Width="39*"/>
|
||||
<ColumnDefinition Width="132*"/>
|
||||
<ColumnDefinition Width="191*"/>
|
||||
</Grid.ColumnDefinitions>
|
||||
<Label Content="Topic endpoint:" HorizontalAlignment="Left" Margin="10,10,0,0" VerticalAlignment="Top" Grid.ColumnSpan="2"/>
|
||||
<Label Content="Topic key:" HorizontalAlignment="Left" Margin="10,41,0,0" VerticalAlignment="Top" Grid.ColumnSpan="2"/>
|
||||
<TextBox x:Name="topicEndpointTxt" HorizontalAlignment="Left" Margin="81,14,0,0" VerticalAlignment="Top" Width="210" TabIndex="5" Grid.ColumnSpan="2" Grid.Column="1"/>
|
||||
<PasswordBox x:Name="topicKeyTxt" HorizontalAlignment="Left" Margin="81,45,0,0" VerticalAlignment="Top" Width="210" TabIndex="6" Grid.ColumnSpan="2" Grid.Column="1"/>
|
||||
</Grid>
|
||||
</GroupBox>
|
||||
<GroupBox Header="Output" Margin="413,217,0,0" VerticalAlignment="Top" HorizontalAlignment="Left" Height="180" Width="358">
|
||||
<TextBox x:Name="outputTxt" Margin="0,0,-2,0" TextWrapping="Wrap" IsReadOnly="True" TabIndex="11" ScrollViewer.CanContentScroll="True" VerticalScrollBarVisibility="Visible"/>
|
||||
</GroupBox>
|
||||
|
||||
</Grid>
|
||||
</Window>
|
|
@ -0,0 +1,275 @@
|
|||
using Azure;
|
||||
using Azure.Messaging.EventGrid;
|
||||
using Npgsql;
|
||||
using Npgsql.Replication;
|
||||
using Npgsql.Replication.Internal;
|
||||
using Npgsql.Replication.PgOutput;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Text.Json.Nodes;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Windows;
|
||||
using System.Windows.Controls;
|
||||
using System.Windows.Data;
|
||||
using System.Windows.Documents;
|
||||
using System.Windows.Input;
|
||||
using System.Windows.Media;
|
||||
using System.Windows.Media.Imaging;
|
||||
using System.Windows.Navigation;
|
||||
using System.Windows.Shapes;
|
||||
using System.Xml.Linq;
|
||||
|
||||
namespace LogicalDecodingPublisher
|
||||
{
|
||||
/// <summary>
|
||||
/// Interaction logic for MainWindow.xaml
|
||||
/// </summary>
|
||||
public partial class MainWindow : Window
|
||||
{
|
||||
#region PGSQL Access
|
||||
|
||||
private const string server = "";
|
||||
private const string user = "";
|
||||
private const string password = "";
|
||||
private const string database = "";
|
||||
private const bool requireSsl = true;
|
||||
|
||||
private LogicalReplicationConnection pgLogicalConnection;
|
||||
|
||||
/// <summary>
|
||||
/// Get Npgsql connection string.
|
||||
/// </summary>
|
||||
/// <param name="syncNotification">True to sync notifications.</param>
|
||||
/// <returns>The connection string.</returns>
|
||||
private string GetConnectionString(bool syncNotification)
|
||||
{
|
||||
var csb = new NpgsqlConnectionStringBuilder
|
||||
{
|
||||
Host = serverTxt.Text,
|
||||
Database = databaseTxt.Text,
|
||||
Username = userTxt.Text,
|
||||
Password = passwordTxt.Password,
|
||||
KeepAlive = 1,
|
||||
SslMode = requireSslCheckbox.IsChecked.HasValue && (bool)requireSslCheckbox.IsChecked ? SslMode.Require : SslMode.Prefer,
|
||||
TrustServerCertificate = true,
|
||||
};
|
||||
|
||||
Debug.Print(csb.ConnectionString);
|
||||
return csb.ConnectionString;
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Event Grid Access
|
||||
|
||||
private const string topicEndpoint = "";
|
||||
private const string topicKey = "";
|
||||
|
||||
private EventGridPublisherClient eventGridClient;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Logical Decoding Configuration
|
||||
|
||||
private const string slotName = "";
|
||||
private const string publicationName = "";
|
||||
|
||||
private CancellationTokenSource cancellationTokenSource;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Main functions
|
||||
|
||||
/// <summary>
|
||||
/// Connect to Event Grid and PG database, create a replication slot, and start listening to messages coming from replication slot.
|
||||
/// </summary>
|
||||
private async void Start()
|
||||
{
|
||||
try
|
||||
{
|
||||
this.eventGridClient = new EventGridPublisherClient(new Uri(topicEndpointTxt.Text), new AzureKeyCredential(topicKeyTxt.Password));
|
||||
|
||||
string connectionString = this.GetConnectionString(true);
|
||||
this.pgLogicalConnection = new LogicalReplicationConnection(connectionString);
|
||||
await this.pgLogicalConnection.Open();
|
||||
|
||||
await this.pgLogicalConnection.CreateLogicalReplicationSlot(slotTxt.Text, pluginTxt.Text);
|
||||
|
||||
this.startBtn.IsEnabled = false;
|
||||
this.stopBtn.IsEnabled = true;
|
||||
this.outputTxt.Text = String.Empty;
|
||||
|
||||
try
|
||||
{
|
||||
var options = new List<KeyValuePair<string, string?>>();
|
||||
if (pluginTxt.Text == "pgoutput")
|
||||
{
|
||||
options.Add(new KeyValuePair<string, string?>("proto_version", "1"));
|
||||
options.Add(new KeyValuePair<string, string?>("publication_names", publicationTxt.Text));
|
||||
}
|
||||
|
||||
this.cancellationTokenSource = new CancellationTokenSource();
|
||||
|
||||
await foreach (var message in this.pgLogicalConnection.StartLogicalReplication(new PgOutputReplicationSlot(slotTxt.Text), this.cancellationTokenSource.Token, null, options))
|
||||
{
|
||||
using StreamReader reader = new StreamReader(message.Data);
|
||||
string rawData = reader.ReadToEnd();
|
||||
DateTime time = DateTime.Now;
|
||||
string type = message.GetType().Name;
|
||||
|
||||
string logData = String.Format("[{0:MM/dd/yy HH:mm:ss}] [{1}]\n{2}\n\n", time, type, rawData);
|
||||
outputTxt.AppendText(logData);
|
||||
outputTxt.ScrollToEnd();
|
||||
Debug.Print(logData);
|
||||
|
||||
this.PublishEvent(time, type, rawData);
|
||||
|
||||
pgLogicalConnection.SetReplicationStatus(message.WalEnd);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
//
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
this.startBtn.IsEnabled = true;
|
||||
this.stopBtn.IsEnabled = false;
|
||||
MessageBox.Show(ex.Message, "Starting failed", MessageBoxButton.OK, MessageBoxImage.Error);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await this.pgLogicalConnection.DropReplicationSlot(slotTxt.Text);
|
||||
await this.pgLogicalConnection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MessageBox.Show(ex.Message, "Connection failed", MessageBoxButton.OK, MessageBoxImage.Error);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stop listening to messages coming from replication slot.
|
||||
/// </summary>
|
||||
private void Stop()
|
||||
{
|
||||
try
|
||||
{
|
||||
this.cancellationTokenSource.Cancel();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MessageBox.Show(ex.Message, "Stopping failed", MessageBoxButton.OK, MessageBoxImage.Error);
|
||||
}
|
||||
|
||||
this.startBtn.IsEnabled = true;
|
||||
this.stopBtn.IsEnabled = false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publish an event to Event Grid.
|
||||
/// </summary>
|
||||
private async void PublishEvent(DateTime time, string type, string rawData)
|
||||
{
|
||||
object? data;
|
||||
try
|
||||
{
|
||||
data = JsonObject.Parse(rawData);
|
||||
}
|
||||
catch
|
||||
{
|
||||
data = rawData;
|
||||
}
|
||||
|
||||
EventGridEvent eventGridEvent = new EventGridEvent("Logical Replication Event", type, "1.0", data)
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
EventTime = time,
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await this.eventGridClient.SendEventAsync(eventGridEvent);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MessageBox.Show(ex.Message, "Event Grid error", MessageBoxButton.OK, MessageBoxImage.Error);
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
public MainWindow()
|
||||
{
|
||||
InitializeComponent();
|
||||
this.Loaded += MainWindow_Loaded;
|
||||
this.Closed += MainWindow_Closed;
|
||||
this.pluginTxt.SelectionChanged += PluginTxt_SelectionChanged;
|
||||
}
|
||||
|
||||
private void PluginTxt_SelectionChanged(object sender, SelectionChangedEventArgs e)
|
||||
{
|
||||
if (this.pluginTxt.SelectedValue.ToString() == "pgoutput")
|
||||
{
|
||||
publicationLabel.Visibility = Visibility.Visible;
|
||||
publicationTxt.Visibility = Visibility.Visible;
|
||||
}
|
||||
else
|
||||
{
|
||||
publicationLabel.Visibility = Visibility.Hidden;
|
||||
publicationTxt.Visibility = Visibility.Hidden;
|
||||
}
|
||||
}
|
||||
|
||||
private void MainWindow_Closed(object? sender, EventArgs e)
|
||||
{
|
||||
if (!stopBtn.IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
this.Stop();
|
||||
}
|
||||
|
||||
private void MainWindow_Loaded(object sender, RoutedEventArgs e)
|
||||
{
|
||||
serverTxt.Text = server;
|
||||
userTxt.Text = user;
|
||||
passwordTxt.Password = password;
|
||||
databaseTxt.Text = database;
|
||||
requireSslCheckbox.IsChecked = requireSsl;
|
||||
topicEndpointTxt.Text = topicEndpoint;
|
||||
topicKeyTxt.Password = topicKey;
|
||||
slotTxt.Text = slotName;
|
||||
pluginTxt.SelectedIndex = 0;
|
||||
publicationTxt.Text = publicationName;
|
||||
}
|
||||
|
||||
private void startBtn_Click(object sender, RoutedEventArgs e)
|
||||
{
|
||||
if (!startBtn.IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
this.Start();
|
||||
}
|
||||
|
||||
private void stopBtn_Click(object sender, RoutedEventArgs e)
|
||||
{
|
||||
if (!stopBtn.IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
this.Stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,203 @@
|
|||
# Event Grid with PostgreSQL Logical Decoding and wal2json
|
||||
|
||||
## Using Microsoft EventGrid and Azure Database for PostgreSQL flexible server
|
||||
In this example we are using [Logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) from database side to stream data manipulation changes (DML) such as ```INSERT```, ```UPDATE``` and ```DELETE``` through a [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS) with plugin ```wal2json``` to external consumers. In this case, the sample app is an asynchronous consumer that is listening to the replication slot and publishing the events to Azure EventGrid. Finally, the subscribers to the EventGrid topic will process these changes.
|
||||
|
||||
## Pre-requisites
|
||||
|
||||
### Setup the Azure Database for PostgreSQL flexible server
|
||||
- Use an existing server or create a [new one](https://docs.microsoft.com/en-us/azure/postgresql/flexible-server/quickstart-create-server-portal).
|
||||
- Postgres terminal [psql](https://www.postgresql.org/docs/current/app-psql.html) installed on your computer.
|
||||
|
||||
### Configure logical decoding
|
||||
1. Logical decoding and [read replicas](https://docs.microsoft.com/en-us/azure/postgresql/single-server/concepts-read-replicas) both depend on Postgres [Write Ahead Log (WAL)](https://www.postgresql.org/docs/current/wal-intro.html) and require different levels of logging from Postgres. The lowest level is ```off``` which puts the least information in the WAL, the next level is ```replica``` which is more verbose and is the minimum level required for read replicas to work, and the highest level is ```logical``` which is even more verbose and is the minimum level required for logical decoding to work (read replicas also work here). Hence, we need to set the server parameter ```wal_level``` to ```logical```. This can be done using Azure portal or Azure CLI:
|
||||
|
||||
```bash
|
||||
az postgres flexible-server parameter set -g <resourceGroup> -s <serverName> --name wal_level --value logical
|
||||
```
|
||||
2. Set the server parameter ```max_replication_slots``` to a value greater than 0. This value specifies the maximum number of replication slots that the server can support.
|
||||
|
||||
```bash
|
||||
az postgres flexible-server parameter set -g <resourceGroup> -s <serverName> --name max_replication_slots --value 10
|
||||
```
|
||||
3. Set the server parameter ```max_wal_senders``` to a value greater than 0. Thsi value specifies the maximum number of concurrent connections from standby servers or streaming base backup clients (i.e., the maximum number of simultaneously running WAL sender processes).
|
||||
|
||||
```bash
|
||||
az postgres flexible-server parameter set -g <resourceGroup> -s <serverName> --name max_wal_senders --value 10
|
||||
```
|
||||
4. Restart the server to apply the changes.
|
||||
|
||||
```bash
|
||||
az postgres flexible-server restart -g <resourceGroup> -n <serverName>
|
||||
```
|
||||
5. Ensure that the user in the database server has replication privileges. To do that, connect to the server:
|
||||
|
||||
```bash
|
||||
psql -h <hostName> -U <userName> -d <databaseName>
|
||||
```
|
||||
And then, grant the user the replication privileges:
|
||||
|
||||
```sql
|
||||
alter user <userName> REPLICATION;
|
||||
```
|
||||
To verify that the user has the replication privileges, run the following:
|
||||
|
||||
```sql
|
||||
\du
|
||||
```
|
||||
You should see ```Replication``` in the ```Attributes``` column for the user:
|
||||
|
||||
```
|
||||
List of roles
|
||||
Role name | Attributes | Member of
|
||||
----------------+------------------------------------------------------------+-----------------------------------------------------------------------------
|
||||
<userName> | Create role, Create DB, Replication | {pg_read_all_settings,pg_read_all_stats,pg_stat_scan_tables,azure_pg_admin}
|
||||
```
|
||||
|
||||
|
||||
### Create EventGrid topic
|
||||
Follow the steps https://docs.microsoft.com/en-us/azure/event-grid/custom-event-quickstart-portal#create-a-custom-topic to create a custom event grid topic.
|
||||
|
||||
## To run the sample code
|
||||
|
||||
In file ```\Logical Decoding\LogicalDecodingPublisher\MainWindow.xaml.cs``` fill in the PostgreSQL Server connection information, Event Gridendpoint and key, and the replication slot name. Or you can just run the app and fill in the information in the UI.
|
||||
|
||||
## Detailed steps
|
||||
|
||||
### Step 1: Create the replication slot
|
||||
Connect to the database with a user that has replication privileges. Then, create a replication slot:
|
||||
|
||||
```sql
|
||||
SELECT * FROM pg_create_logical_replication_slot('<slotName>', '<pluginName>');
|
||||
```
|
||||
|
||||
The first argument is the name of the replication slot and the second argument is the name of the logical decoding plugin. Azure Database for PostgreSQL flexible server supports ```wal2json```, ```pgoutput``` and ```test_decoding``` plugins. ```wal2json``` is the plugin and is recommended for production use. ```test_decoding``` is a simple plugin that is useful for testing and debugging.
|
||||
|
||||
In our example app, by using the library ```Npgsql.Replication``` we can open the connection to the database and create the replication slot like this:
|
||||
|
||||
```csharp
|
||||
string connectionString = this.GetConnectionString(true);
|
||||
this.pgLogicalConnection = new LogicalReplicationConnection(connectionString);
|
||||
await this.pgLogicalConnection.Open();
|
||||
await this.pgLogicalConnection.CreateLogicalReplicationSlot(slotTxt.Text, pluginTxt.Text);
|
||||
```
|
||||
|
||||
### Step 2: Start an asynchronous consumer
|
||||
To start listening to the output of the replication slot streaming we just created, we need to use the method ```StartLogicalReplication``` that will return us an asynchronous iterator. This iterator will yield a message for each data manipulation change that is done on the database. In our example app we print the message to the UI and to the debug console, as well as publish the message to Event Grid:
|
||||
|
||||
```csharp
|
||||
this.cancellationTokenSource = new CancellationTokenSource();
|
||||
await foreach (var message in this.pgLogicalConnection.StartLogicalReplication(new PgOutputReplicationSlot(slotTxt.Text), this.cancellationTokenSource.Token, null, options))
|
||||
{
|
||||
using StreamReader reader = new StreamReader(message.Data);
|
||||
string rawData = reader.ReadToEnd();
|
||||
DateTime time = DateTime.Now;
|
||||
string type = message.GetType().Name;
|
||||
|
||||
string logData = String.Format("[{0:MM/dd/yy HH:mm:ss}] [{1}]\n{2}\n\n", time, type, rawData);
|
||||
outputTxt.AppendText(logData);
|
||||
outputTxt.ScrollToEnd();
|
||||
Debug.Print(logData);
|
||||
|
||||
this.PublishEvent(time, type, rawData);
|
||||
|
||||
pgLogicalConnection.SetReplicationStatus(message.WalEnd);
|
||||
}
|
||||
```
|
||||
|
||||
We can also consume the messages one by one manually if we are connected to the database:
|
||||
|
||||
```sql
|
||||
SELECT data FROM pg_logical_slot_get_changes('<slotName>', NULL, NULL, 'pretty-print', '1');
|
||||
```
|
||||
|
||||
Or stream them using ```pg_recvlogical```:
|
||||
|
||||
```bash
|
||||
pg_recvlogical -h <hostName> -d <databaseName> -U <userName> --slot <slotName> --start -o pretty-print=1 -f –
|
||||
```
|
||||
|
||||
### Step 3: Configure the connection to Event Grid
|
||||
To publish the messages to Event Grid we need to create a client, which is an instance of the ```EventGridPublisherClient``` class where we specify end topic endpoint and the key:
|
||||
|
||||
```csharp
|
||||
this.eventGridClient = new EventGridPublisherClient(new Uri(topicEndpointTxt.Text), new AzureKeyCredential(topicKeyTxt.Password));
|
||||
```
|
||||
|
||||
And then, we can publish the messages to Event Grid using the method ```SendEventsAsync```. Note that the plugin ```wal2json``` returns the messages in JSON format, so we can deserialize the message to a ```JObject``` and then publish it to Event Grid:
|
||||
|
||||
```csharp
|
||||
private async void PublishEvent(DateTime time, string type, string rawData)
|
||||
{
|
||||
object? data;
|
||||
try
|
||||
{
|
||||
data = JsonObject.Parse(rawData);
|
||||
}
|
||||
catch
|
||||
{
|
||||
data = rawData;
|
||||
}
|
||||
|
||||
EventGridEvent eventGridEvent = new EventGridEvent("Logical Replication Event", type, "1.0", data)
|
||||
{
|
||||
Id = Guid.NewGuid().ToString(),
|
||||
EventTime = time,
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await this.eventGridClient.SendEventAsync(eventGridEvent);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
MessageBox.Show(ex.Message, "Event Grid error", MessageBoxButton.OK, MessageBoxImage.Error);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Step 4: Make data changes to the database
|
||||
To see the messages in the UI, in the debug console and in Event Grid, we need to make some data changes to the database. For example, we can create a table and issue some insertions, updates and deletions:
|
||||
|
||||
```sql
|
||||
CREATE TABLE inventory (id SERIAL, item VARCHAR(30), qty INT, PRIMARY KEY(id));
|
||||
INSERT INTO inventory (item, qty) VALUES ('apples', '100');
|
||||
UPDATE inventory SET qty = 96 WHERE item = 'apples';
|
||||
DELETE FROM inventory WHERE item = 'apples';
|
||||
```
|
||||
|
||||
The output in the UI and in the debug console should look like this:
|
||||
|
||||
```json
|
||||
[09/08/22 02:52:20] [XLogDataMessage]
|
||||
{"change":[]}
|
||||
|
||||
[09/08/22 02:52:21] [XLogDataMessage]
|
||||
{"change":[{"kind":"insert","schema":"public","table":"inventory","columnnames":["id","item","qty"],"columntypes":["integer","character varying(30)","integer"],"columnvalues":[1,"apples",100]}]}
|
||||
|
||||
[09/08/22 02:52:21] [XLogDataMessage]
|
||||
{"change":[{"kind":"update","schema":"public","table":"inventory","columnnames":["id","item","qty"],"columntypes":["integer","character varying(30)","integer"],"columnvalues":[1,"apples",96],"oldkeys":{"keynames":["id"],"keytypes":["integer"],"keyvalues":[1]}}]}
|
||||
|
||||
[09/08/22 02:52:21] [XLogDataMessage]
|
||||
{"change":[{"kind":"delete","schema":"public","table":"inventory","oldkeys":{"keynames":["id"],"keytypes":["integer"],"keyvalues":[1]}}]}
|
||||
```
|
||||
|
||||
### Step 5: Stop the asynchronous consumer and drop the replication slot
|
||||
To stop the asynchronous consumer, we need to cancel the ```cancellationTokenSource``` that we created in step 2:
|
||||
|
||||
```csharp
|
||||
this.cancellationTokenSource.Cancel();
|
||||
```
|
||||
|
||||
And after that, we can drop the replication slot and close the connection:
|
||||
|
||||
```csharp
|
||||
await this.pgLogicalConnection.DropReplicationSlot(slotTxt.Text);
|
||||
await this.pgLogicalConnection.DisposeAsync();
|
||||
```
|
||||
|
||||
We can also drop the replication slot manually with:
|
||||
|
||||
```sql
|
||||
SELECT pg_drop_replication_slot('<slotName>');
|
||||
```
|
Загрузка…
Ссылка в новой задаче