Initial Redis Codec Implementation (#161)

Motivation:
Added/Ported Redis codec support into DotNetty

Modifications:
- Added Redis Codec project
- Added Redis Codec unit test project.
- Extra: Added DefaultByteBufferHolder to DotNetty.Buffers.
- Extra: Added UnrealeasableByteBuffer to DotNetty.Buffers.

Result:
Redis codec supported by DotNetty.
This commit is contained in:
Johnny Z 2016-10-11 02:57:57 +10:00 коммит произвёл Max Gortman
Родитель 7aef9c7500
Коммит 6fc89419c8
44 изменённых файлов: 2870 добавлений и 0 удалений

Просмотреть файл

@ -85,6 +85,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SecureChat.Client", "exampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SecureChat.Server", "examples\SecureChat.Server\SecureChat.Server.csproj", "{DDA47BCE-752F-4DEE-BB75-28E7D6921E79}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetty.Codecs.Redis", "src\DotNetty.Codecs.Redis\DotNetty.Codecs.Redis.csproj", "{6AE7CF76-971C-428B-853A-DBE73A861E60}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetty.Codecs.Redis.Tests", "test\DotNetty.Codecs.Redis.Tests\DotNetty.Codecs.Redis.Tests.csproj", "{258ABFFC-4AF5-4CCA-9145-929AC854D139}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -246,6 +250,18 @@ Global
{DDA47BCE-752F-4DEE-BB75-28E7D6921E79}.Release|Any CPU.Build.0 = Release|Any CPU
{DDA47BCE-752F-4DEE-BB75-28E7D6921E79}.Signed|Any CPU.ActiveCfg = Release|Any CPU
{DDA47BCE-752F-4DEE-BB75-28E7D6921E79}.Signed|Any CPU.Build.0 = Release|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Release|Any CPU.Build.0 = Release|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Signed|Any CPU.ActiveCfg = Release|Any CPU
{6AE7CF76-971C-428B-853A-DBE73A861E60}.Signed|Any CPU.Build.0 = Release|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Debug|Any CPU.Build.0 = Debug|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Release|Any CPU.ActiveCfg = Release|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Release|Any CPU.Build.0 = Release|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Signed|Any CPU.ActiveCfg = Release|Any CPU
{258ABFFC-4AF5-4CCA-9145-929AC854D139}.Signed|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -283,5 +299,7 @@ Global
{C98D8C38-759D-492E-B5E5-B4BBA65A6AC5} = {2B766264-D269-415C-8F2A-5AFC44409C01}
{B6C42665-2A5B-4BFD-B5B9-965C31A383D3} = {C98D8C38-759D-492E-B5E5-B4BBA65A6AC5}
{DDA47BCE-752F-4DEE-BB75-28E7D6921E79} = {C98D8C38-759D-492E-B5E5-B4BBA65A6AC5}
{6AE7CF76-971C-428B-853A-DBE73A861E60} = {F02D7F30-ABA7-4438-8D28-10898E731906}
{258ABFFC-4AF5-4CCA-9145-929AC854D139} = {6A0821D4-8A5D-42AD-8E3F-F519100F4AD8}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -122,6 +122,7 @@ Target "CopyOutput" (fun _ ->
"DotNetty.Codecs"
"DotNetty.Handlers"
"DotNetty.Codecs.Mqtt"
"DotNetty.Codecs.Redis"
]
|> List.iter copyOutput
)
@ -141,6 +142,7 @@ Target "ResignAssemblies" (fun _ ->
"DotNetty.Codecs"
"DotNetty.Handlers"
"DotNetty.Codecs.Mqtt"
"DotNetty.Codecs.Redis"
]
|> List.iter copyOutput
)

Просмотреть файл

@ -0,0 +1,84 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Buffers
{
using System.Diagnostics.Contracts;
using DotNetty.Common;
public abstract class DefaultByteBufferHolder : IByteBufferHolder
{
readonly IByteBuffer buffer;
protected DefaultByteBufferHolder(IByteBuffer buffer)
{
Contract.Requires(buffer != null);
this.buffer = buffer;
}
public IByteBuffer Content
{
get
{
if (this.buffer.ReferenceCount <= 0)
{
throw new IllegalReferenceCountException(this.buffer.ReferenceCount);
}
return this.buffer;
}
}
public virtual int ReferenceCount => this.buffer.ReferenceCount;
public virtual IReferenceCounted Retain()
{
this.buffer.Retain();
return this;
}
public virtual IReferenceCounted Retain(int increment)
{
this.buffer.Retain(increment);
return this;
}
public virtual IReferenceCounted Touch()
{
this.buffer.Touch();
return this;
}
public virtual IReferenceCounted Touch(object hint)
{
this.buffer.Touch(hint);
return this;
}
public virtual bool Release() => this.buffer.Release();
public virtual bool Release(int decrement) => this.buffer.Release(decrement);
public override bool Equals(object obj)
{
if (ReferenceEquals(this, obj))
{
return true;
}
if (obj is DefaultByteBufferHolder)
{
return this.buffer.Equals(((DefaultByteBufferHolder)obj).buffer);
}
return false;
}
public override int GetHashCode() => this.buffer.GetHashCode();
public abstract IByteBufferHolder Copy();
public abstract IByteBufferHolder Duplicate();
}
}

Просмотреть файл

@ -52,6 +52,7 @@
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="AdvancedLeakAwareByteBuffer.cs" />
<Compile Include="DefaultByteBufferHolder.cs" />
<Compile Include="Properties\Friends.cs" />
<Compile Include="IPoolArenaMetric.cs" />
<Compile Include="IPoolChunkListMetric.cs" />
@ -85,6 +86,7 @@
<Compile Include="Unpooled.cs" />
<Compile Include="UnpooledByteBufferAllocator.cs" />
<Compile Include="UnpooledHeapByteBuffer.cs" />
<Compile Include="UnreleasableByteBuffer.cs" />
<Compile Include="WrappedByteBuf.cs" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -58,6 +58,20 @@ namespace DotNetty.Buffers
return WrappedBuffer(array).Slice(offset, length);
}
public static IByteBuffer UnreleasableBuffer(IByteBuffer buffer)
{
Contract.Requires(buffer != null);
return new UnreleasableByteBuffer(buffer);
}
public static IByteBuffer Unreleasable(this IByteBuffer buffer)
{
Contract.Requires(buffer != null);
return buffer is UnreleasableByteBuffer ? buffer : UnreleasableBuffer(buffer);
}
/// <summary>
/// Creates a new big-endian buffer whose content is a copy of the specified <see cref="array" />.
/// The new buffer's <see cref="IByteBuffer.ReaderIndex" /> and <see cref="IByteBuffer.WriterIndex" />

Просмотреть файл

@ -0,0 +1,54 @@

namespace DotNetty.Buffers
{
using DotNetty.Common;
sealed class UnreleasableByteBuffer : WrappedByteBuffer
{
SwappedByteBuffer swappedBuffer;
internal UnreleasableByteBuffer(IByteBuffer buf)
: base(buf)
{}
public override IByteBuffer WithOrder(ByteOrder endianness)
{
if (this.Order == endianness)
{
return this;
}
SwappedByteBuffer buffer = this.swappedBuffer;
if (buffer != null)
{
return this.swappedBuffer;
}
buffer = new SwappedByteBuffer(this);
this.swappedBuffer = buffer;
return this.swappedBuffer;
}
public override IByteBuffer ReadSlice(int length) => new UnreleasableByteBuffer(this.Buf.ReadSlice(length));
public override IByteBuffer Slice() => new UnreleasableByteBuffer(this.Buf.Slice());
public override IByteBuffer Slice(int index, int length) => new UnreleasableByteBuffer(this.Buf.Slice(index, length));
public override IByteBuffer Duplicate() => new UnreleasableByteBuffer(this.Buf.Duplicate());
public override IReferenceCounted Retain() => this;
public override IReferenceCounted Retain(int increment) => this;
public override IReferenceCounted Touch() => this;
public override IReferenceCounted Touch(object hint) => this;
public override bool Release() => false;
public override bool Release(int decrement) => false;
}
}

Просмотреть файл

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{6AE7CF76-971C-428B-853A-DBE73A861E60}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>DotNetty.Codecs.Redis</RootNamespace>
<AssemblyName>DotNetty.Codecs.Redis</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Messages\ArrayHeaderRedisMessage.cs" />
<Compile Include="Messages\ArrayRedisMessage.cs" />
<Compile Include="Messages\BulkStringHeaderRedisMessage.cs" />
<Compile Include="Messages\BulkStringRedisContent.cs" />
<Compile Include="Messages\ErrorRedisMessage.cs" />
<Compile Include="Messages\FullBulkStringRedisMessage.cs" />
<Compile Include="Messages\IBulkStringRedisContent.cs" />
<Compile Include="Messages\IFullBulkStringRedisMessage.cs" />
<Compile Include="Messages\ILastBulkStringRedisContent.cs" />
<Compile Include="Messages\IntegerRedisMessage.cs" />
<Compile Include="Messages\IRedisMessage.cs" />
<Compile Include="IRedisMessagePool.cs" />
<Compile Include="Messages\LastBulkStringRedisContent.cs" />
<Compile Include="MessageAggregationException.cs" />
<Compile Include="RedisArrayAggregator.cs" />
<Compile Include="RedisBulkStringAggregator.cs" />
<Compile Include="RedisCodecException.cs" />
<Compile Include="RedisCodecUtil.cs" />
<Compile Include="RedisConstants.cs" />
<Compile Include="FixedRedisMessagePool.cs" />
<Compile Include="Messages\RedisMessageType.cs" />
<Compile Include="Messages\SimpleStringRedisMessage.cs" />
<Compile Include="Messages\AbstractStringRedisMessage.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RedisDecoder.cs" />
<Compile Include="RedisEncoder.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetty.Buffers\DotNetty.Buffers.csproj">
<Project>{5de3c557-48bf-4cdb-9f47-474d343dd841}</Project>
<Name>DotNetty.Buffers</Name>
</ProjectReference>
<ProjectReference Include="..\DotNetty.Codecs\DotNetty.Codecs.csproj">
<Project>{2abd244e-ef8f-460d-9c30-39116499e6e4}</Project>
<Name>DotNetty.Codecs</Name>
</ProjectReference>
<ProjectReference Include="..\DotNetty.Common\DotNetty.Common.csproj">
<Project>{de58fe41-5e99-44e5-86bc-fc9ed8761daf}</Project>
<Name>DotNetty.Common</Name>
</ProjectReference>
<ProjectReference Include="..\DotNetty.Transport\DotNetty.Transport.csproj">
<Project>{8218c9ee-0a4a-432f-a12a-b54202f97b05}</Project>
<Name>DotNetty.Transport</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="DotNetty.Codecs.Redis.nuspec" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<package xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<metadata xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<id>@project@</id>
<title>@project@@title@</title>
<version>@build.number@</version>
<authors>@authors@</authors>
<owners>@authors@</owners>
<description>Redis codec support for DotNetty.</description>
<licenseUrl>https://github.com/Azure/DotNetty/blob/master/LICENSE.txt</licenseUrl>
<projectUrl>https://github.com/Azure/DotNetty/</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<releaseNotes>@releaseNotes@</releaseNotes>
<copyright>@copyright@</copyright>
<tags>@tags@ Redis codecs</tags>
@dependencies@
@references@
<frameworkAssemblies>
<frameworkAssembly assemblyName="System.Core" targetFramework=".NETFramework4.5" />
<frameworkAssembly assemblyName="System" targetFramework=".NETFramework4.5" />
</frameworkAssemblies>
</metadata>
</package>

Просмотреть файл

@ -0,0 +1,140 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
public sealed class FixedRedisMessagePool : IRedisMessagePool
{
readonly IReadOnlyDictionary<string, SimpleStringRedisMessage> stringToSimpleStringMessages;
readonly IReadOnlyDictionary<IByteBuffer, SimpleStringRedisMessage> byteBufferToSimpleStringMessages;
readonly IReadOnlyDictionary<string, ErrorRedisMessage> stringToErrorMessages;
readonly IReadOnlyDictionary<IByteBuffer, ErrorRedisMessage> byteBufferToErrorMessages;
readonly IReadOnlyDictionary<long, IntegerRedisMessage> longToIntegerMessages;
readonly IReadOnlyDictionary<long, byte[]> longToBytes;
readonly IReadOnlyDictionary<IByteBuffer, IntegerRedisMessage> byteBufferToIntegerMessages;
static readonly FixedRedisMessagePool Instance;
static FixedRedisMessagePool()
{
Instance = new FixedRedisMessagePool();
}
public static IRedisMessagePool Default => Instance;
static readonly string[] SimpleStrings = {
"OK",
"PONG",
"QUEUED"
};
static readonly string[] Errors = {
"ERR",
"ERR index out of range",
"ERR no such key",
"ERR source and destination objects are the same",
"ERR syntax error",
"BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.",
"BUSYKEY Target key name already exists.",
"EXECABORT Transaction discarded because of previous errors.",
"LOADING Redis is loading the dataset in memory",
"MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.",
"MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. "
+ "Commands that may modify the data set are disabled. Please check Redis logs for details "
+ "about the error.",
"NOAUTH Authentication required.",
"NOREPLICAS Not enough good slaves to write.",
"NOSCRIPT No matching script. Please use EVAL.",
"OOM command not allowed when used memory > 'maxmemory'.",
"READONLY You can't write against a read only slave.",
"WRONGTYPE Operation against a key holding the wrong kind of value"
};
FixedRedisMessagePool()
{
var stringToSimpleStringValues = new Dictionary<string, SimpleStringRedisMessage>();
var byteBufferToSimpleStringValues = new Dictionary<IByteBuffer, SimpleStringRedisMessage>();
foreach (string simpleString in SimpleStrings)
{
IByteBuffer key = Unpooled
.WrappedBuffer(Encoding.UTF8.GetBytes(simpleString))
.Unreleasable();
var redisMessage = new SimpleStringRedisMessage(simpleString);
stringToSimpleStringValues.Add(simpleString, redisMessage);
byteBufferToSimpleStringValues.Add(key, redisMessage);
}
this.stringToSimpleStringMessages = new ReadOnlyDictionary<string, SimpleStringRedisMessage>(stringToSimpleStringValues);
this.byteBufferToSimpleStringMessages = new ReadOnlyDictionary<IByteBuffer, SimpleStringRedisMessage>(byteBufferToSimpleStringValues);
var errorToErrorValues = new Dictionary<string, ErrorRedisMessage>();
var byteBufferToErrorValues = new Dictionary<IByteBuffer, ErrorRedisMessage>();
foreach (string error in Errors)
{
IByteBuffer key = Unpooled
.WrappedBuffer(Encoding.UTF8.GetBytes(error))
.Unreleasable();
var redisMessage = new ErrorRedisMessage(error);
errorToErrorValues.Add(error, redisMessage);
byteBufferToErrorValues.Add(key, redisMessage);
}
this.stringToErrorMessages = new ReadOnlyDictionary<string, ErrorRedisMessage>(errorToErrorValues);
this.byteBufferToErrorMessages = new ReadOnlyDictionary<IByteBuffer, ErrorRedisMessage>(byteBufferToErrorValues);
var longToIntegerValues = new Dictionary<long, IntegerRedisMessage>();
var longToByteBufferValues = new Dictionary<long, byte[]>();
var byteBufferToIntegerValues = new Dictionary<IByteBuffer, IntegerRedisMessage>();
for (long value = MinimumCachedIntegerNumber; value < MaximumCachedIntegerNumber; value++)
{
byte[] bytes = RedisCodecUtil.LongToAsciiBytes(value);
IByteBuffer key = Unpooled
.WrappedBuffer(bytes)
.Unreleasable();
var redisMessage = new IntegerRedisMessage(value);
longToIntegerValues.Add(value, redisMessage);
longToByteBufferValues.Add(value, bytes);
byteBufferToIntegerValues.Add(key, redisMessage);
}
this.longToIntegerMessages = new ReadOnlyDictionary<long, IntegerRedisMessage>(longToIntegerValues);
this.longToBytes = new ReadOnlyDictionary<long, byte[]>(longToByteBufferValues);
this.byteBufferToIntegerMessages = new ReadOnlyDictionary<IByteBuffer, IntegerRedisMessage>(byteBufferToIntegerValues);
}
static readonly long MinimumCachedIntegerNumber = RedisConstants.NullValue; // inclusive
const long MaximumCachedIntegerNumber = 128; // exclusive
public bool TryGetMessage(string content, out SimpleStringRedisMessage message)
=> this.stringToSimpleStringMessages.TryGetValue(content, out message);
public bool TryGetMessage(IByteBuffer content, out SimpleStringRedisMessage message)
=> this.byteBufferToSimpleStringMessages.TryGetValue(content, out message);
public bool TryGetMessage(string content, out ErrorRedisMessage message)
=> this.stringToErrorMessages.TryGetValue(content, out message);
public bool TryGetMessage(IByteBuffer content, out ErrorRedisMessage message)
=> this.byteBufferToErrorMessages.TryGetValue(content, out message);
public bool TryGetMessage(long value, out IntegerRedisMessage message)
=> this.longToIntegerMessages.TryGetValue(value, out message);
public bool TryGetMessage(IByteBuffer content, out IntegerRedisMessage message)
=> this.byteBufferToIntegerMessages.TryGetValue(content, out message);
public bool TryGetBytes(long value, out byte[] bytes)
=> this.longToBytes.TryGetValue(value, out bytes);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
public interface IRedisMessagePool
{
bool TryGetMessage(string content, out SimpleStringRedisMessage message);
bool TryGetMessage(IByteBuffer content, out SimpleStringRedisMessage message);
bool TryGetMessage(string content, out ErrorRedisMessage message);
bool TryGetMessage(IByteBuffer content, out ErrorRedisMessage message);
bool TryGetMessage(IByteBuffer content, out IntegerRedisMessage message);
bool TryGetMessage(long value, out IntegerRedisMessage message);
bool TryGetBytes(long value, out byte[] bytes);
}
}

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
public class MessageAggregationException : InvalidOperationException
{
public MessageAggregationException() { }
public MessageAggregationException(string message)
: this(message, null)
{ }
public MessageAggregationException(Exception exception)
: this(null, exception)
{ }
public MessageAggregationException(string message, Exception exception)
: base(message, exception)
{ }
}
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using System.Diagnostics.Contracts;
public abstract class AbstractStringRedisMessage : IRedisMessage
{
protected AbstractStringRedisMessage(string content)
{
Contract.Requires(content != null);
this.Content = content;
}
public string Content { get; }
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using System.Diagnostics.Contracts;
public sealed class ArrayHeaderRedisMessage : IRedisMessage
{
public ArrayHeaderRedisMessage(long length)
{
Contract.Requires(length >= RedisConstants.NullValue);
this.Length = length;
}
public long Length { get; }
public bool IsNull => this.Length == RedisConstants.NullValue;
public override string ToString() => $"{nameof(ArrayHeaderRedisMessage)}[length={this.Length}]";
}
}

Просмотреть файл

@ -0,0 +1,50 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using DotNetty.Common;
using DotNetty.Common.Utilities;
public sealed class ArrayRedisMessage : AbstractReferenceCounted, IRedisMessage
{
public static readonly ArrayRedisMessage Null = new ArrayRedisMessage();
public static readonly ArrayRedisMessage Empty = new ArrayRedisMessage();
ArrayRedisMessage()
: this(new List<IRedisMessage>())
{ }
public ArrayRedisMessage(IList<IRedisMessage> childMessages)
{
Contract.Requires(childMessages != null);
this.Children = childMessages;
}
public IList<IRedisMessage> Children { get; }
public bool IsNull => this == Null;
public override IReferenceCounted Touch(object hint)
{
foreach (IRedisMessage message in this.Children)
{
ReferenceCountUtil.Touch(message);
}
return this;
}
protected override void Deallocate()
{
foreach (IRedisMessage message in this.Children)
{
ReferenceCountUtil.Release(message);
}
}
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using System.Diagnostics.Contracts;
public sealed class BulkStringHeaderRedisMessage : IRedisMessage
{
public BulkStringHeaderRedisMessage(int bulkStringLength)
{
Contract.Requires(bulkStringLength > 0);
this.BulkStringLength = bulkStringLength;
}
public int BulkStringLength { get; }
public bool IsNull => false;
}
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using DotNetty.Buffers;
public sealed class BulkStringRedisContent : DefaultByteBufferHolder, IBulkStringRedisContent
{
public BulkStringRedisContent(IByteBuffer buffer)
: base(buffer)
{ }
public override IByteBufferHolder Copy()
{
IByteBuffer buffer = this.Content.Copy();
return new BulkStringRedisContent(buffer);
}
public override IByteBufferHolder Duplicate()
{
IByteBuffer buffer = this.Content.Duplicate();
return new BulkStringRedisContent(buffer);
}
public override string ToString() => $"{nameof(BulkStringRedisContent)}[content={this.Content}]";
}
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
public sealed class ErrorRedisMessage : AbstractStringRedisMessage
{
public ErrorRedisMessage(string content)
: base(content)
{ }
public override string ToString() => $"{nameof(ErrorRedisMessage)}[content={this.Content}]";
}
}

Просмотреть файл

@ -0,0 +1,62 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using DotNetty.Buffers;
using DotNetty.Common;
public sealed class FullBulkStringRedisMessage : DefaultByteBufferHolder, IFullBulkStringRedisMessage
{
public static readonly IFullBulkStringRedisMessage Null = new NullOrEmptyFullBulkStringRedisMessage(true);
public static readonly IFullBulkStringRedisMessage Empty = new NullOrEmptyFullBulkStringRedisMessage(false);
public FullBulkStringRedisMessage(IByteBuffer buffer)
: base(buffer)
{ }
public bool IsNull => false;
public override IByteBufferHolder Copy() => new FullBulkStringRedisMessage(this.Content.Copy());
public override IByteBufferHolder Duplicate() => new FullBulkStringRedisMessage(this.Content.Duplicate());
public override string ToString() => $"{nameof(FullBulkStringRedisMessage)}[content={this.Content}]";
sealed class NullOrEmptyFullBulkStringRedisMessage : IFullBulkStringRedisMessage
{
internal NullOrEmptyFullBulkStringRedisMessage(bool isNull)
: this(Unpooled.Empty, isNull)
{ }
NullOrEmptyFullBulkStringRedisMessage(IByteBuffer content, bool isNull)
{
this.Content = content;
this.IsNull = isNull;
}
public bool IsNull { get; }
public int ReferenceCount => 1;
public IByteBuffer Content { get; }
public IByteBufferHolder Copy() => this;
public IByteBufferHolder Duplicate() => this;
public IReferenceCounted Touch() => this;
public IReferenceCounted Touch(object hint) => this;
public IReferenceCounted Retain() => this;
public IReferenceCounted Retain(int increment) => this;
public bool Release() => false;
public bool Release(int decrement) => false;
}
}
}

Просмотреть файл

@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using DotNetty.Buffers;
public interface IBulkStringRedisContent : IRedisMessage, IByteBufferHolder
{
}
}

Просмотреть файл

@ -0,0 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
public interface IFullBulkStringRedisMessage : ILastBulkStringRedisContent
{
bool IsNull { get; }
}
}

Просмотреть файл

@ -0,0 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
public interface ILastBulkStringRedisContent : IBulkStringRedisContent
{
}
}

Просмотреть файл

@ -0,0 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
/// <summary>
/// Base interface for redis messages.
/// </summary>
public interface IRedisMessage
{
}
}

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
public sealed class IntegerRedisMessage : IRedisMessage
{
public IntegerRedisMessage(long value)
{
this.Value = value;
}
public long Value { get; }
public override string ToString() => $"{nameof(IntegerRedisMessage)}[value={this.Value}]";
}
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
using DotNetty.Buffers;
public sealed class LastBulkStringRedisContent : DefaultByteBufferHolder, ILastBulkStringRedisContent
{
public LastBulkStringRedisContent(IByteBuffer buffer)
: base(buffer)
{ }
public override IByteBufferHolder Copy()
{
IByteBuffer buffer = this.Content.Copy();
return new LastBulkStringRedisContent(buffer);
}
public override IByteBufferHolder Duplicate()
{
IByteBuffer buffer = this.Content.Duplicate();
return new LastBulkStringRedisContent(buffer);
}
public override string ToString() => $"{nameof(LastBulkStringRedisContent)}[content={this.Content}]";
}
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
/// <summary>
/// Type of RESP (REdis Serialization Protocol)
/// see http://redis.io/topics/protocol
/// </summary>
public enum RedisMessageType
{
SimpleString = RedisCodecUtil.RedisSimpleString,
Error = RedisCodecUtil.RedisError,
Integer = RedisCodecUtil.RedisInteger,
BulkString = RedisCodecUtil.RedisBulkString,
ArrayHeader = RedisCodecUtil.RedisArray,
Array = RedisCodecUtil.RedisArray
}
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Messages
{
public sealed class SimpleStringRedisMessage : AbstractStringRedisMessage
{
public SimpleStringRedisMessage(string content)
: base(content)
{ }
public override string ToString() => $"{nameof(SimpleStringRedisMessage)}[content={this.Content}]";
}
}

Просмотреть файл

@ -0,0 +1,5 @@
using System.Reflection;
using System.Resources;
[assembly: NeutralResourcesLanguage("en-US")]
[assembly: AssemblyMetadata("Serviceable", "True")]

Просмотреть файл

@ -0,0 +1,98 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using DotNetty.Codecs.Redis.Messages;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
public sealed class RedisArrayAggregator : MessageToMessageDecoder<IRedisMessage>
{
readonly Stack<AggregateState> depths = new Stack<AggregateState>(4);
protected override void Decode(IChannelHandlerContext context, IRedisMessage message, List<object> output)
{
Contract.Requires(context != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
if (message is ArrayHeaderRedisMessage)
{
message = this.DecodeRedisArrayHeader((ArrayHeaderRedisMessage)message);
if (message == null)
{
return;
}
}
else
{
ReferenceCountUtil.Retain(message);
}
while (this.depths.Count > 0)
{
AggregateState current = this.depths.Peek();
current.Children.Add(message);
// if current aggregation completed, go to parent aggregation.
if (current.Children.Count == current.Length)
{
message = new ArrayRedisMessage(current.Children);
this.depths.Pop();
}
else
{
// not aggregated yet. try next time.
return;
}
}
output.Add(message);
}
IRedisMessage DecodeRedisArrayHeader(ArrayHeaderRedisMessage header)
{
if (header.IsNull)
{
return ArrayRedisMessage.Null;
}
if (header.Length == 0)
{
return ArrayRedisMessage.Empty;
}
if (header.Length > 0)
{
// Currently, this codec doesn't support `long` length for arrays because Java's List.size() is int.
if (header.Length > int.MaxValue)
{
throw new CodecException(
$"This codec doesn't support longer length than {int.MaxValue}");
}
// start aggregating array
this.depths.Push(new AggregateState((int)header.Length));
return null;
}
throw new CodecException($"Bad length: {header.Length}");
}
class AggregateState
{
internal int Length { get; }
internal readonly List<IRedisMessage> Children;
internal AggregateState(int length)
{
this.Length = length;
this.Children = new List<IRedisMessage>(length);
}
}
}
}

Просмотреть файл

@ -0,0 +1,219 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
public sealed class RedisBulkStringAggregator : MessageToMessageDecoder<IRedisMessage>
{
const int DefaultMaximumCumulationBufferComponents = 1024;
readonly int maximumContentLength;
int maximumCumulationBufferComponents;
IByteBufferHolder currentMessage;
public RedisBulkStringAggregator()
: this(RedisConstants.MaximumMessageLength)
{ }
RedisBulkStringAggregator(int maximumContentLength)
{
Contract.Requires(maximumContentLength > 0);
this.maximumContentLength = maximumContentLength;
this.maximumCumulationBufferComponents = DefaultMaximumCumulationBufferComponents;
this.currentMessage = null;
}
public int MaximumCumulationBufferComponents
{
get
{
return this.maximumCumulationBufferComponents;
}
set
{
Contract.Requires(value >= 2);
if (this.currentMessage != null)
{
throw new InvalidOperationException(
"Decoder properties cannot be changed once the decoder is added to a pipeline.");
}
this.maximumCumulationBufferComponents = value;
}
}
public override bool AcceptInboundMessage(object message)
{
// No need to match last and full types because they are subset of first and middle types.
if (!base.AcceptInboundMessage(message))
{
return false;
}
// Already checked above, this cast is safe
var redisMessage = (IRedisMessage)message;
return (IsContentMessage(redisMessage)
|| IsStartMessage(redisMessage))
&& !IsAggregated(redisMessage);
}
protected override void Decode(IChannelHandlerContext context, IRedisMessage message, List<object> output)
{
Contract.Requires(context != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
if (IsStartMessage(message)) // Start header
{
if (this.currentMessage != null)
{
this.currentMessage.Release();
this.currentMessage = null;
throw new MessageAggregationException(
$"Start message {message} should have current buffer to be null.");
}
var startMessage = (BulkStringHeaderRedisMessage)message;
if (IsContentLengthInvalid(startMessage, this.maximumContentLength))
{
this.InvokeHandleOversizedMessage(context, startMessage);
}
// A streamed message -initialize the cumulative buffer, and wait for incoming chunks.
CompositeByteBuffer buffer = context.Allocator.CompositeBuffer(this.maximumCumulationBufferComponents);
this.currentMessage = BeginAggregation(buffer);
}
else if (IsContentMessage(message)) // Content
{
if (this.currentMessage == null)
{
// it is possible that a TooLongFrameException was already thrown but we can still discard data
// until the begging of the next request/response.
return;
}
// Merge the received chunk into the content of the current message.
var content = (CompositeByteBuffer)this.currentMessage.Content;
var bufferHolder = (IByteBufferHolder)message;
// Handle oversized message.
if (content.ReadableBytes > this.maximumContentLength - bufferHolder.Content.ReadableBytes)
{
// By convention, full message type extends first message type.
// ReSharper disable once PossibleInvalidCastException
var startMessage = (BulkStringHeaderRedisMessage)message;
this.InvokeHandleOversizedMessage(context, startMessage);
return;
}
// Append the content of the chunk.
AppendPartialContent(content, bufferHolder.Content);
bool isLast = IsLastContentMessage(message);
if (isLast)
{
// All done
output.Add(this.currentMessage);
this.currentMessage = null;
}
}
else
{
throw new MessageAggregationException($"Unexpected message {message}");
}
}
static void AppendPartialContent(CompositeByteBuffer content, IByteBuffer partialContent)
{
Contract.Requires(content != null);
Contract.Requires(partialContent != null);
if (!partialContent.IsReadable())
{
return;
}
var buffer = (IByteBuffer)partialContent.Retain();
content.AddComponent(buffer);
// Note that WriterIndex must be manually increased
content.SetWriterIndex(content.WriterIndex + buffer.ReadableBytes);
}
void InvokeHandleOversizedMessage(IChannelHandlerContext context, BulkStringHeaderRedisMessage startMessage)
{
Contract.Requires(context != null);
Contract.Requires(startMessage != null);
this.currentMessage = null;
try
{
context.FireExceptionCaught(
new TooLongFrameException($"Content length exceeded {this.maximumContentLength} bytes."));
}
finally
{
// Release the message in case it is a full one.
ReferenceCountUtil.Release(startMessage);
}
}
static bool IsStartMessage(IRedisMessage message)
{
Contract.Requires(message != null);
return message is BulkStringHeaderRedisMessage
&& !IsAggregated(message);
}
static bool IsContentMessage(IRedisMessage message)
{
Contract.Requires(message != null);
return message is IBulkStringRedisContent;
}
static FullBulkStringRedisMessage BeginAggregation(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
return new FullBulkStringRedisMessage(byteBuffer);
}
static bool IsLastContentMessage(IRedisMessage message)
{
Contract.Requires(message != null);
return message is LastBulkStringRedisContent;
}
static bool IsContentLengthInvalid(BulkStringHeaderRedisMessage start, int expectedMaximumContentLength)
{
Contract.Requires(start != null);
return start.BulkStringLength > expectedMaximumContentLength;
}
static bool IsAggregated(IRedisMessage message)
{
Contract.Requires(message != null);
return message is IFullBulkStringRedisMessage;
}
}
}

Просмотреть файл

@ -0,0 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
public sealed class RedisCodecException : CodecException
{
public RedisCodecException(string message)
: this(message, null)
{ }
public RedisCodecException(Exception exception)
: this(null, exception)
{ }
public RedisCodecException(string message, Exception exception)
: base(message, exception)
{ }
}
}

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
using System.Globalization;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
static class RedisCodecUtil
{
internal const char RedisSimpleString = '+';
internal const char RedisError = '-';
internal const char RedisInteger = ':';
internal const char RedisBulkString = '$';
internal const char RedisArray = '*';
internal static RedisMessageType ParseMessageType(byte byteCode)
{
switch ((char)byteCode)
{
case RedisSimpleString:
return RedisMessageType.SimpleString;
case RedisError:
return RedisMessageType.Error;
case RedisInteger:
return RedisMessageType.Integer;
case RedisBulkString:
return RedisMessageType.BulkString;
case RedisArray:
return RedisMessageType.ArrayHeader;
default:
throw new RedisCodecException($"Unknown RedisMessageType code:{byteCode}");
}
}
// todo: can optimize once ByteBufferUtil.WriteAscii is implemented
internal static byte[] LongToAsciiBytes(long value) =>
Encoding.ASCII.GetBytes(Convert.ToString(value, CultureInfo.InvariantCulture));
internal static short ToShort(char first, char second, ByteOrder byteOrder)
{
switch (byteOrder)
{
case ByteOrder.BigEndian:
return (short)((second << 8) | first);
case ByteOrder.LittleEndian:
return (short)((first << 8) | second);
default:
throw new InvalidOperationException($"Unknown ByteOrder type {byteOrder}");
}
}
internal static byte[] GetBytes(short value, ByteOrder byteOrder = ByteOrder.BigEndian)
{
switch (byteOrder)
{
case ByteOrder.BigEndian:
return new[]
{
(byte)(value & 0xff),
(byte)((value >> 8) & 0xff)
};
case ByteOrder.LittleEndian:
return new[]
{
(byte)((value >> 8) & 0xff),
(byte)(value & 0xff)
};
default:
throw new InvalidOperationException($"Unknown ByteOrder type {byteOrder}");
}
}
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
using DotNetty.Buffers;
static class RedisConstants
{
internal static readonly ByteOrder DefaultByteOrder =
BitConverter.IsLittleEndian
? ByteOrder.LittleEndian
: ByteOrder.BigEndian;
internal static readonly int MaximumInlineMessageLength = 1024 * 64;
internal static readonly int TypeLength = 1;
internal static readonly int EndOfLineLength = 2;
internal static readonly int NullLength = 2;
internal static readonly int NullValue = -1;
internal static readonly int MaximumMessageLength = 512 * 1024 * 1024; // 512MB
internal static readonly int PositiveLongValueMaximumLength = 19; // length of Long.MAX_VALUE
internal static readonly int LongValueMaximumLength = PositiveLongValueMaximumLength + 1; // +1 is sign
internal static readonly short Null = RedisCodecUtil.ToShort('-', '1', DefaultByteOrder);
internal static readonly short EndOfLine = RedisCodecUtil.ToShort('\r', '\n', DefaultByteOrder);
}
}

Просмотреть файл

@ -0,0 +1,420 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
public sealed class RedisDecoder : ByteToMessageDecoder
{
readonly IRedisMessagePool messagePool;
readonly int maximumInlineMessageLength;
readonly ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
enum State
{
DecodeType,
DecodeInline, // SIMPLE_STRING, ERROR, INTEGER
DecodeLength, // BULK_STRING, ARRAY_HEADER
DecodeBulkStringEndOfLine,
DecodeBulkStringContent,
}
// current decoding states
State state = State.DecodeType;
RedisMessageType messageType;
int remainingBulkLength;
public RedisDecoder()
: this(RedisConstants.MaximumInlineMessageLength, FixedRedisMessagePool.Default)
{ }
public RedisDecoder(int maximumInlineMessageLength, IRedisMessagePool messagePool)
{
Contract.Requires(maximumInlineMessageLength > 0
&& maximumInlineMessageLength <= RedisConstants.MaximumMessageLength);
Contract.Requires(messagePool != null);
this.maximumInlineMessageLength = maximumInlineMessageLength;
this.messagePool = messagePool;
}
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
Contract.Requires(context != null);
Contract.Requires(input != null);
Contract.Requires(output != null);
try
{
while (true)
{
switch (this.state)
{
case State.DecodeType:
if (!this.DecodeType(input))
{
return;
}
break;
case State.DecodeInline:
if (!this.DecodeInline(input, output))
{
return;
}
break;
case State.DecodeLength:
if (!this.DecodeLength(input, output))
{
return;
}
break;
case State.DecodeBulkStringEndOfLine:
if (!this.DecodeBulkStringEndOfLine(input, output))
{
return;
}
break;
case State.DecodeBulkStringContent:
if (!this.DecodeBulkStringContent(input, output))
{
return;
}
break;
default:
throw new RedisCodecException($"Unknown state: {this.state}");
}
}
}
catch (RedisCodecException)
{
this.ResetDecoder();
throw;
}
catch (Exception exception)
{
this.ResetDecoder();
throw new RedisCodecException(exception);
}
}
IRedisMessage ReadInlineMessage(RedisMessageType redisMessageType, IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
switch (redisMessageType)
{
case RedisMessageType.SimpleString:
return this.GetSimpleStringMessage(byteBuffer);
case RedisMessageType.Error:
return this.GetErrorMessage(byteBuffer);
case RedisMessageType.Integer:
return this.GetIntegerMessage(byteBuffer);
default:
throw new RedisCodecException(
$"Message type {redisMessageType} must be inline messageType of SimpleString, Error or Integer");
}
}
SimpleStringRedisMessage GetSimpleStringMessage(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
SimpleStringRedisMessage message;
if (!this.messagePool.TryGetMessage(byteBuffer, out message))
{
message = new SimpleStringRedisMessage(byteBuffer.ToString(Encoding.UTF8));
}
return message;
}
ErrorRedisMessage GetErrorMessage(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
ErrorRedisMessage message;
if (!this.messagePool.TryGetMessage(byteBuffer, out message))
{
message = new ErrorRedisMessage(byteBuffer.ToString(Encoding.UTF8));
}
return message;
}
IntegerRedisMessage GetIntegerMessage(IByteBuffer byteBuffer)
{
IntegerRedisMessage message;
if (!this.messagePool.TryGetMessage(byteBuffer, out message))
{
message = new IntegerRedisMessage(this.ParseNumber(byteBuffer));
}
return message;
}
bool DecodeType(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
if (!byteBuffer.IsReadable())
{
return false;
}
RedisMessageType redisMessageType = RedisCodecUtil.ParseMessageType(byteBuffer.ReadByte());
this.state = IsInline(redisMessageType)
? State.DecodeInline
: State.DecodeLength;
this.messageType = redisMessageType;
return true;
}
static bool IsInline(RedisMessageType messageType) =>
messageType == RedisMessageType.SimpleString
|| messageType == RedisMessageType.Error
|| messageType == RedisMessageType.Integer;
bool DecodeInline(IByteBuffer byteBuffer, ICollection<object> output)
{
Contract.Requires(byteBuffer != null);
Contract.Requires(output != null);
IByteBuffer buffer = ReadLine(byteBuffer);
if (buffer == null)
{
if (byteBuffer.ReadableBytes > this.maximumInlineMessageLength)
{
throw new RedisCodecException(
$"Length: {byteBuffer.ReadableBytes} (expected: <= {this.maximumInlineMessageLength})");
}
return false;
}
IRedisMessage message = this.ReadInlineMessage(this.messageType, buffer);
output.Add(message);
this.ResetDecoder();
return true;
}
bool DecodeLength(IByteBuffer byteBuffer, ICollection<object> output)
{
Contract.Requires(byteBuffer != null);
Contract.Requires(output != null);
IByteBuffer lineByteBuffer = ReadLine(byteBuffer);
if (lineByteBuffer == null)
{
return false;
}
long length = this.ParseNumber(lineByteBuffer);
if (length < RedisConstants.NullValue)
{
throw new RedisCodecException(
$"Length: {length} (expected: >= {RedisConstants.NullValue})");
}
switch (this.messageType)
{
case RedisMessageType.ArrayHeader:
output.Add(new ArrayHeaderRedisMessage(length));
this.ResetDecoder();
return true;
case RedisMessageType.BulkString:
if (length > RedisConstants.MaximumMessageLength)
{
throw new RedisCodecException(
$"Length: {length} (expected: <= {RedisConstants.MaximumMessageLength})");
}
this.remainingBulkLength = (int)length; // range(int) is already checked.
return this.DecodeBulkString(byteBuffer, output);
default:
throw new RedisCodecException(
$"Bad messageType: {this.messageType}, expecting ArrayHeader or BulkString.");
}
}
bool DecodeBulkString(IByteBuffer byteBuffer, ICollection<object> output)
{
Contract.Requires(byteBuffer != null);
Contract.Requires(output != null);
if (this.remainingBulkLength == RedisConstants.NullValue) // $-1\r\n
{
output.Add(FullBulkStringRedisMessage.Null);
this.ResetDecoder();
return true;
}
if (this.remainingBulkLength == 0)
{
this.state = State.DecodeBulkStringEndOfLine;
return this.DecodeBulkStringEndOfLine(byteBuffer, output);
}
// expectedBulkLength is always positive.
output.Add(new BulkStringHeaderRedisMessage(this.remainingBulkLength));
this.state = State.DecodeBulkStringContent;
return this.DecodeBulkStringContent(byteBuffer, output);
}
bool DecodeBulkStringContent(IByteBuffer byteBuffer, ICollection<object> output)
{
Contract.Requires(byteBuffer != null);
Contract.Requires(output != null);
int readableBytes = byteBuffer.ReadableBytes;
if (readableBytes == 0)
{
return false;
}
// if this is last frame.
if (readableBytes >= this.remainingBulkLength + RedisConstants.EndOfLineLength)
{
IByteBuffer content = byteBuffer.ReadSlice(this.remainingBulkLength);
ReadEndOfLine(byteBuffer);
// Only call retain after readEndOfLine(...) as the method may throw an exception.
output.Add(new LastBulkStringRedisContent((IByteBuffer)content.Retain()));
this.ResetDecoder();
return true;
}
// chunked write.
int toRead = Math.Min(this.remainingBulkLength, readableBytes);
this.remainingBulkLength -= toRead;
IByteBuffer buffer = byteBuffer.ReadSlice(toRead);
output.Add(new BulkStringRedisContent((IByteBuffer)buffer.Retain()));
return true;
}
// $0\r\n <here> \r\n
bool DecodeBulkStringEndOfLine(IByteBuffer byteBuffer, ICollection<object> output)
{
Contract.Requires(byteBuffer != null);
Contract.Requires(output != null);
if (byteBuffer.ReadableBytes < RedisConstants.EndOfLineLength)
{
return false;
}
ReadEndOfLine(byteBuffer);
output.Add(FullBulkStringRedisMessage.Empty);
this.ResetDecoder();
return true;
}
static IByteBuffer ReadLine(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
if (!byteBuffer.IsReadable(RedisConstants.EndOfLineLength))
{
return null;
}
int lfIndex = byteBuffer.ForEachByte(ByteProcessor.FIND_LF);
if (lfIndex < 0)
{
return null;
}
IByteBuffer buffer = byteBuffer.ReadSlice(lfIndex - byteBuffer.ReaderIndex - 1);
ReadEndOfLine(byteBuffer);
return buffer;
}
static void ReadEndOfLine(IByteBuffer byteBuffer)
{
Contract.Requires(byteBuffer != null);
short delim = byteBuffer.ReadShort();
if (RedisConstants.EndOfLine == delim)
{
return;
}
byte[] bytes = RedisCodecUtil.GetBytes(delim);
throw new RedisCodecException($"delimiter: [{bytes[0]},{bytes[1]}] (expected: \\r\\n)");
}
void ResetDecoder()
{
this.state = State.DecodeType;
this.remainingBulkLength = 0;
}
long ParseNumber(IByteBuffer byteBuffer)
{
int readableBytes = byteBuffer.ReadableBytes;
bool negative = readableBytes > 0
&& byteBuffer.GetByte(byteBuffer.ReaderIndex) == '-';
int extraOneByteForNegative = negative ? 1 : 0;
if (readableBytes <= extraOneByteForNegative)
{
throw new RedisCodecException(
$"No number to parse: {byteBuffer.ToString(Encoding.ASCII)}");
}
if (readableBytes > RedisConstants.PositiveLongValueMaximumLength + extraOneByteForNegative)
{
throw new RedisCodecException(
$"Too many characters to be a valid RESP Integer: {byteBuffer.ToString(Encoding.ASCII)}");
}
if (negative)
{
return -this.ParsePositiveNumber(byteBuffer.SkipBytes(extraOneByteForNegative));
}
return this.ParsePositiveNumber(byteBuffer);
}
long ParsePositiveNumber(IByteBuffer byteBuffer)
{
this.toPositiveLongProcessor.Reset();
byteBuffer.ForEachByte(this.toPositiveLongProcessor);
return this.toPositiveLongProcessor.Content;
}
class ToPositiveLongProcessor : ByteProcessor
{
public override bool Process(byte value)
{
if (!char.IsDigit((char)value))
{
throw new RedisCodecException($"Bad byte in number: {value}, expecting digits from 0 to 9");
}
this.Content = this.Content * 10 + (value - '0');
return true;
}
public long Content { get; private set; }
public void Reset() => this.Content = 0;
}
}
}

Просмотреть файл

@ -0,0 +1,307 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
using DotNetty.Transport.Channels;
public sealed class RedisEncoder : MessageToMessageEncoder<IRedisMessage>
{
readonly IRedisMessagePool messagePool;
public RedisEncoder()
: this(FixedRedisMessagePool.Default)
{ }
public RedisEncoder(IRedisMessagePool messagePool)
{
Contract.Requires(messagePool != null);
this.messagePool = messagePool;
}
protected override void Encode(IChannelHandlerContext context, IRedisMessage message, List<object> output)
{
Contract.Requires(context != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
try
{
this.Write(context.Allocator, message, output);
}
catch (CodecException)
{
throw;
}
catch (Exception exception)
{
throw new CodecException(exception);
}
}
void Write(IByteBufferAllocator allocator, IRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
if (message is SimpleStringRedisMessage)
{
Write(allocator, (SimpleStringRedisMessage)message, output);
return;
}
if (message is ErrorRedisMessage)
{
Write(allocator, (ErrorRedisMessage)message, output);
return;
}
if (message is IntegerRedisMessage)
{
this.Write(allocator, (IntegerRedisMessage)message, output);
return;
}
if (message is FullBulkStringRedisMessage)
{
this.Write(allocator, (FullBulkStringRedisMessage)message, output);
return;
}
if (message is IBulkStringRedisContent)
{
Write(allocator, (IBulkStringRedisContent)message, output);
return;
}
if (message is BulkStringHeaderRedisMessage)
{
this.Write(allocator, (BulkStringHeaderRedisMessage)message, output);
return;
}
if (message is ArrayHeaderRedisMessage)
{
this.Write(allocator, (ArrayHeaderRedisMessage)message, output);
return;
}
if (message is ArrayRedisMessage)
{
this.Write(allocator, (ArrayRedisMessage)message, output);
return;
}
throw new CodecException($"Unknown message type: {message}");
}
void Write(IByteBufferAllocator allocator, FullBulkStringRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
IByteBuffer buffer = allocator.Buffer(
RedisConstants.TypeLength
+ (message.IsNull ? RedisConstants.NullLength : RedisConstants.LongValueMaximumLength)
+ RedisConstants.EndOfLineLength);
buffer.WriteByte((char)RedisMessageType.BulkString);
if (message.IsNull)
{
buffer.WriteShort(RedisConstants.Null);
buffer.WriteShort(RedisConstants.EndOfLine);
output.Add(buffer);
}
else
{
int readableBytes = message.Content.ReadableBytes;
byte[] bytes = this.NumberToBytes(readableBytes);
buffer.WriteBytes(bytes);
buffer.WriteShort(RedisConstants.EndOfLine);
output.Add(buffer);
output.Add(message.Content.Retain());
output.Add(allocator
.Buffer(RedisConstants.EndOfLineLength)
.WriteShort(RedisConstants.EndOfLine));
}
}
static void Write(IByteBufferAllocator allocator, IBulkStringRedisContent message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
output.Add(message.Content.Retain());
if (message is ILastBulkStringRedisContent)
{
output.Add(allocator
.Buffer(RedisConstants.EndOfLineLength)
.WriteShort(RedisConstants.EndOfLine));
}
}
void Write(IByteBufferAllocator allocator, BulkStringHeaderRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
IByteBuffer buffer = allocator.Buffer(
RedisConstants.TypeLength
+ (message.IsNull ? RedisConstants.NullLength : RedisConstants.LongValueMaximumLength)
+ RedisConstants.EndOfLineLength);
buffer.WriteByte((char)RedisMessageType.BulkString);
if (!message.IsNull)
{
byte[] bytes = this.NumberToBytes(message.BulkStringLength);
buffer.WriteBytes(bytes);
}
buffer.WriteShort(RedisConstants.EndOfLine);
output.Add(buffer);
}
static void Write(IByteBufferAllocator allocator, SimpleStringRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
IByteBuffer buffer = WriteString(allocator, RedisMessageType.SimpleString, message.Content);
output.Add(buffer);
}
static void Write(IByteBufferAllocator allocator, ErrorRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
IByteBuffer buffer = WriteString(allocator, RedisMessageType.Error, message.Content);
output.Add(buffer);
}
void Write(IByteBufferAllocator allocator, IntegerRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
IByteBuffer buffer = allocator.Buffer(
RedisConstants.TypeLength
+ RedisConstants.LongValueMaximumLength
+ RedisConstants.EndOfLineLength);
// Header
buffer.WriteByte((char)RedisMessageType.Integer);
// Content
byte[] bytes = this.NumberToBytes(message.Value);
buffer.WriteBytes(bytes);
// EOL
buffer.WriteShort(RedisConstants.EndOfLine);
output.Add(buffer);
}
void Write(IByteBufferAllocator allocator, ArrayHeaderRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
this.WriteArrayHeader(allocator, message.IsNull ? default(long?) : message.Length, output);
}
void Write(IByteBufferAllocator allocator, ArrayRedisMessage message, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(message != null);
Contract.Requires(output != null);
this.WriteArrayHeader(allocator, message.IsNull ? default(long?) : message.Children.Count, output);
if (message.IsNull)
{
return;
}
foreach (IRedisMessage childMessage in message.Children)
{
this.Write(allocator, childMessage, output);
}
}
void WriteArrayHeader(IByteBufferAllocator allocator, long? length, ICollection<object> output)
{
Contract.Requires(allocator != null);
Contract.Requires(output != null);
IByteBuffer buffer = allocator.Buffer(
RedisConstants.TypeLength
+ (!length.HasValue ? RedisConstants.NullLength : RedisConstants.LongValueMaximumLength)
+ RedisConstants.EndOfLineLength);
buffer.WriteByte((char)RedisMessageType.ArrayHeader);
if (!length.HasValue)
{
buffer.WriteShort(RedisConstants.Null);
}
else
{
byte[] bytes = this.NumberToBytes(length.Value);
buffer.WriteBytes(bytes);
}
buffer.WriteShort(RedisConstants.EndOfLine);
output.Add(buffer);
}
static IByteBuffer WriteString(IByteBufferAllocator allocator, RedisMessageType messageType, string content)
{
Contract.Requires(allocator != null);
IByteBuffer buffer = allocator.Buffer(
RedisConstants.TypeLength
+ Encoding.UTF8.GetMaxByteCount(content.Length)
+ RedisConstants.EndOfLineLength);
// Header
buffer.WriteByte((char)messageType);
// Content
buffer.WriteBytes(Encoding.UTF8.GetBytes(content));
// EOL
buffer.WriteShort(RedisConstants.EndOfLine);
return buffer;
}
byte[] NumberToBytes(long value)
{
byte[] bytes;
if (! this.messagePool.TryGetBytes(value, out bytes))
{
bytes = RedisCodecUtil.LongToAsciiBytes(value);
}
return bytes;
}
}
}

Просмотреть файл

@ -202,6 +202,7 @@
<Compile Include="ThreadLocalPool.cs" />
<Compile Include="PreciseTimeSpan.cs" />
<Compile Include="Utilities\AbstractConstant.cs" />
<Compile Include="Utilities\AbstractReferenceCounted.cs" />
<Compile Include="Utilities\AtomicReference.cs" />
<Compile Include="Utilities\DefaultAttributeMap.cs" />
<Compile Include="Utilities\IAttribute.cs" />

Просмотреть файл

@ -0,0 +1,80 @@

namespace DotNetty.Common.Utilities
{
using System;
using System.Diagnostics.Contracts;
using System.Threading;
public abstract class AbstractReferenceCounted : IReferenceCounted
{
int referenceCount = 1;
public int ReferenceCount => this.referenceCount;
public IReferenceCounted Retain() => this.Retain(1);
public IReferenceCounted Retain(int increment)
{
Contract.Requires(increment > 0);
return this.RetainCore(increment);
}
IReferenceCounted RetainCore(int increment)
{
while (true)
{
int count = this.referenceCount;
int nextCount = count + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCount <= increment)
{
throw new InvalidOperationException($"refCnt: {count}" + (increment > 0 ? $"increment: {increment}" : $"decrement: -{increment}"));
}
if (Interlocked.CompareExchange(ref this.referenceCount, nextCount, count) == count)
{
break;
}
}
return this;
}
public IReferenceCounted Touch() => this.Touch(null);
public abstract IReferenceCounted Touch(object hint);
public bool Release() => this.Release(1);
public bool Release(int decrement)
{
Contract.Requires(decrement > 0);
return this.ReleaseCore(decrement);
}
bool ReleaseCore(int decrement)
{
while (true)
{
int count = this.referenceCount;
if (count < decrement)
{
throw new InvalidOperationException($"refCnt: {count}" + (decrement > 0 ? $"increment: {decrement}" : $"decrement: -{decrement}"));
}
if (Interlocked.CompareExchange(ref this.referenceCount, count - decrement, count) == decrement)
{
this.Deallocate();
return true;
}
return false;
}
}
protected abstract void Deallocate();
}
}

Просмотреть файл

@ -94,6 +94,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PooledBufferAllocatorTests.cs" />
<Compile Include="AbstractByteBufTest.cs" />
<Compile Include="UnreleaseableByteBufferTest.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />

Просмотреть файл

@ -0,0 +1,29 @@

namespace DotNetty.Buffers.Tests
{
using Xunit;
public class UnreleaseableByteBufferTest
{
[Fact]
public void CantRelease()
{
IByteBuffer buffer = Unpooled.UnreleasableBuffer(Unpooled.Buffer(1));
Assert.Equal(1, buffer.ReferenceCount);
Assert.False(buffer.Release());
Assert.Equal(1, buffer.ReferenceCount);
Assert.False(buffer.Release());
Assert.Equal(1, buffer.ReferenceCount);
buffer.Retain(5);
Assert.Equal(1, buffer.ReferenceCount);
buffer.Retain();
Assert.Equal(1, buffer.ReferenceCount);
Assert.True(buffer.Unwrap().Release());
Assert.Equal(0, buffer.ReferenceCount);
}
}
}

Просмотреть файл

@ -0,0 +1,123 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{258ABFFC-4AF5-4CCA-9145-929AC854D139}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>DotNetty.Codecs.Redis.Tests</RootNamespace>
<AssemblyName>DotNetty.Codecs.Redis.Tests</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">10.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
<ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
<IsCodedUITest>False</IsCodedUITest>
<TestProjectType>UnitTest</TestProjectType>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="xunit.abstractions, Version=2.0.0.0, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="xunit.assert, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\..\packages\xunit.assert.2.1.0\lib\dotnet\xunit.assert.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="xunit.core, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\..\packages\xunit.extensibility.core.2.1.0\lib\dotnet\xunit.core.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="xunit.execution.desktop, Version=2.1.0.3179, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL">
<HintPath>..\..\packages\xunit.extensibility.execution.2.1.0\lib\net45\xunit.execution.desktop.dll</HintPath>
<Private>True</Private>
</Reference>
</ItemGroup>
<Choose>
<When Condition="('$(VisualStudioVersion)' == '10.0' or '$(VisualStudioVersion)' == '') and '$(TargetFrameworkVersion)' == 'v3.5'">
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
</ItemGroup>
</When>
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RedisCodecTestUtil.cs" />
<Compile Include="RedisDecoderTest.cs" />
<Compile Include="RedisEncoderTests.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetty.Buffers\DotNetty.Buffers.csproj">
<Project>{5DE3C557-48BF-4CDB-9F47-474D343DD841}</Project>
<Name>DotNetty.Buffers</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\DotNetty.Codecs.Redis\DotNetty.Codecs.Redis.csproj">
<Project>{6ae7cf76-971c-428b-853a-dbe73a861e60}</Project>
<Name>DotNetty.Codecs.Redis</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\DotNetty.Codecs\DotNetty.Codecs.csproj">
<Project>{2abd244e-ef8f-460d-9c30-39116499e6e4}</Project>
<Name>DotNetty.Codecs</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\DotNetty.Common\DotNetty.Common.csproj">
<Project>{de58fe41-5e99-44e5-86bc-fc9ed8761daf}</Project>
<Name>DotNetty.Common</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\DotNetty.Transport\DotNetty.Transport.csproj">
<Project>{8218C9EE-0A4A-432F-A12A-B54202F97B05}</Project>
<Name>DotNetty.Transport</Name>
</ProjectReference>
</ItemGroup>
<Choose>
<When Condition="'$(VisualStudioVersion)' == '10.0' And '$(IsCodedUITest)' == 'True'">
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.CodedUITestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITest.Common, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITest.Extension, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITesting, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
</ItemGroup>
</When>
</Choose>
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("DotNetty.Codecs.Redis.Tests")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("DotNetty.Codecs.Redis.Tests")]
[assembly: AssemblyCopyright("Copyright © 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("258abffc-4af5-4cca-9145-929ac854d139")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,35 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Tests
{
using System;
using System.Globalization;
using System.Text;
using DotNetty.Buffers;
static class RedisCodecTestUtil
{
internal static byte[] Bytes(this IByteBuffer byteBuffer)
{
var data = new byte[byteBuffer.ReadableBytes];
byteBuffer.ReadBytes(data);
return data;
}
internal static byte[] Bytes(this long value) =>
Encoding.ASCII.GetBytes(Convert.ToString(value, CultureInfo.InvariantCulture));
internal static IByteBuffer Buffer(this long value) =>
Buffer(value.Bytes());
internal static IByteBuffer Buffer(this string value) =>
Buffer(Bytes(value));
internal static byte[] Bytes(this string value) =>
Encoding.UTF8.GetBytes(value);
internal static IByteBuffer Buffer(this byte[] data) =>
Unpooled.WrappedBuffer(data);
}
}

Просмотреть файл

@ -0,0 +1,330 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Tests
{
using System;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels.Embedded;
using Xunit;
public sealed class RedisDecoderTest
{
[Theory]
[InlineData("OK")]
public void DecodeSimpleString(string value)
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
Assert.False(channel.WriteInbound("+".Buffer()));
foreach (char c in value)
{
string charValue = new string(new[] { c });
Assert.False(channel.WriteInbound(charValue.Buffer()));
}
Assert.True(channel.WriteInbound("\r\n".Buffer()));
var message = channel.ReadInbound<SimpleStringRedisMessage>();
Assert.NotNull(message);
Assert.Equal(value, message.Content);
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Fact]
public void DecodeTwoSimpleStrings()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
Assert.False(channel.WriteInbound("+".Buffer()));
Assert.False(channel.WriteInbound("O".Buffer()));
Assert.False(channel.WriteInbound("K".Buffer()));
Assert.True(channel.WriteInbound("\r\n+SEC".Buffer()));
Assert.True(channel.WriteInbound("OND\r\n".Buffer()));
var message = channel.ReadInbound<SimpleStringRedisMessage>();
Assert.NotNull(message);
Assert.Equal("OK", message.Content);
ReferenceCountUtil.Release(message);
message = channel.ReadInbound<SimpleStringRedisMessage>();
Assert.NotNull(message);
Assert.Equal("SECOND", message.Content);
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Theory]
[InlineData("ERROR sample message")]
public void DecodeError(string value)
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
Assert.False(channel.WriteInbound("-".Buffer()));
Assert.False(channel.WriteInbound(value.Buffer()));
Assert.False(channel.WriteInbound("\r".Buffer()));
Assert.True(channel.WriteInbound("\n".Buffer()));
var message = channel.ReadInbound<ErrorRedisMessage>();
Assert.Equal(value, message.Content);
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Theory]
[InlineData(1234L)]
public void DecodeInteger(long value)
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
Assert.False(channel.WriteInbound(":".Buffer()));
Assert.False(channel.WriteInbound(value.Buffer()));
Assert.True(channel.WriteInbound("\r\n".Buffer()));
var message = channel.ReadInbound<IntegerRedisMessage>();
Assert.Equal(value, message.Value);
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Theory]
[InlineData("bulk\nst", "ring\ntest\n1234")]
public void DecodeBulkString(string value1, string value2)
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
byte[] content = (value1 + value2).Bytes();
Assert.False(channel.WriteInbound("$".Buffer()));
Assert.False(channel.WriteInbound(content.Length.ToString().Buffer()));
Assert.False(channel.WriteInbound("\r\n".Buffer()));
Assert.False(channel.WriteInbound(value1.Buffer()));
Assert.False(channel.WriteInbound(value2.Buffer()));
Assert.True(channel.WriteInbound("\r\n".Buffer()));
var message = channel.ReadInbound<FullBulkStringRedisMessage>();
byte[] output = message.Content.Bytes();
Assert.Equal(content.Length, output.Length);
Assert.True(content.SequenceEqual(output));
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Fact]
public void DecodeEmptyBulkString()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
byte[] content = "".Bytes();
Assert.False(channel.WriteInbound("$".Buffer()));
Assert.False(channel.WriteInbound(content.Length.ToString().Buffer()));
Assert.False(channel.WriteInbound("\r\n".Buffer()));
Assert.False(channel.WriteInbound(content.Buffer()));
Assert.True(channel.WriteInbound("\r\n".Buffer()));
var message = channel.ReadInbound<IFullBulkStringRedisMessage>();
byte[] output = message.Content.Bytes();
Assert.Equal(content.Length, output.Length);
Assert.True(content.SequenceEqual(output));
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Fact]
public void DecodeNullBulkString()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
const long NullValue = -1;
Assert.False(channel.WriteInbound("$".Buffer()));
Assert.False(channel.WriteInbound(NullValue.Buffer()));
Assert.True(channel.WriteInbound("\r\n".Buffer()));
Assert.True(channel.WriteInbound("$".Buffer()));
Assert.True(channel.WriteInbound(NullValue.Buffer()));
Assert.True(channel.WriteInbound("\r\n".Buffer()));
var message = channel.ReadInbound<IFullBulkStringRedisMessage>();
Assert.NotNull(message);
Assert.True(message.IsNull);
ReferenceCountUtil.Release(message);
message = channel.ReadInbound<IFullBulkStringRedisMessage>();
Assert.NotNull(message);
Assert.True(message.IsNull);
ReferenceCountUtil.Release(message);
message = channel.ReadInbound<IFullBulkStringRedisMessage>();
Assert.Null(message);
Assert.False(channel.Finish());
}
[Fact]
public void DecodeSimpleArray()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
Assert.False(channel.WriteInbound("*3\r\n".Buffer()));
Assert.False(channel.WriteInbound(":1234\r\n".Buffer()));
Assert.False(channel.WriteInbound("+sim".Buffer()));
Assert.False(channel.WriteInbound("ple\r\n-err".Buffer()));
Assert.True(channel.WriteInbound("or\r\n".Buffer()));
var message = channel.ReadInbound<ArrayRedisMessage>();
Assert.NotNull(message);
IList<IRedisMessage> children = message.Children;
Assert.NotNull(children);
Assert.Equal(3, children.Count);
Assert.IsType<IntegerRedisMessage>(children[0]);
Assert.Equal(1234L, ((IntegerRedisMessage)children[0]).Value);
Assert.IsType<SimpleStringRedisMessage>(children[1]);
Assert.Equal("simple", ((SimpleStringRedisMessage)children[1]).Content);
Assert.IsType<ErrorRedisMessage>(children[2]);
Assert.Equal("error", ((ErrorRedisMessage)children[2]).Content);
ReferenceCountUtil.Release(message);
Assert.False(channel.Finish());
}
[Fact]
public void DecodeNestedArray()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
IByteBuffer buffer = Unpooled.Buffer();
buffer.WriteBytes("*2\r\n".Buffer());
buffer.WriteBytes("*3\r\n:1\r\n:2\r\n:3\r\n".Buffer());
buffer.WriteBytes("*2\r\n+Foo\r\n-Bar\r\n".Buffer());
Assert.True(channel.WriteInbound(buffer));
var message = channel.ReadInbound<ArrayRedisMessage>();
Assert.NotNull(message);
IList<IRedisMessage> children = message.Children;
Assert.NotNull(children);
Assert.Equal(2, children.Count);
var intArray = (ArrayRedisMessage)children[0];
var strArray = (ArrayRedisMessage)children[1];
Assert.Equal(3, intArray.Children.Count);
Assert.Equal(1L, ((IntegerRedisMessage)intArray.Children[0]).Value);
Assert.Equal(2L, ((IntegerRedisMessage)intArray.Children[1]).Value);
Assert.Equal(3L, ((IntegerRedisMessage)intArray.Children[2]).Value);
Assert.Equal(2, strArray.Children.Count);
Assert.Equal("Foo", ((SimpleStringRedisMessage)strArray.Children[0]).Content);
Assert.Equal("Bar", ((ErrorRedisMessage)strArray.Children[1]).Content);
Assert.False(channel.Finish());
}
[Fact]
public void ErrorOnDoubleReleaseArrayReferenceCounted()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
IByteBuffer buffer = Unpooled.Buffer();
buffer.WriteBytes("*2\r\n".Buffer());
buffer.WriteBytes("*3\r\n:1\r\n:2\r\n:3\r\n".Buffer());
buffer.WriteBytes("*2\r\n+Foo\r\n-Bar\r\n".Buffer());
Assert.True(channel.WriteInbound(buffer));
var message = channel.ReadInbound<ArrayRedisMessage>();
ReferenceCountUtil.Release(message);
Assert.Throws<InvalidOperationException>(() => ReferenceCountUtil.Release(message));
Assert.False(channel.Finish());
}
[Fact]
public void ErrorOnReleaseArrayChildReferenceCounted()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
IByteBuffer buffer = Unpooled.Buffer();
buffer.WriteBytes("*2\r\n".Buffer());
buffer.WriteBytes("*3\r\n:1\r\n:2\r\n:3\r\n".Buffer());
buffer.WriteBytes("$3\r\nFoo\r\n".Buffer());
Assert.True(channel.WriteInbound(buffer));
var message = channel.ReadInbound<ArrayRedisMessage>();
IList<IRedisMessage> children = message.Children;
ReferenceCountUtil.Release(message);
Assert.Throws<InvalidOperationException>(() => ReferenceCountUtil.Release(children[0]));
Assert.False(channel.Finish());
}
[Fact]
public void ErrorOnReleasecontentOfArrayChildReferenceCounted()
{
var channel = new EmbeddedChannel(
new RedisDecoder(),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
IByteBuffer buffer = Unpooled.Buffer();
buffer.WriteBytes("*2\r\n".Buffer());
buffer.WriteBytes("$3\r\nFoo\r\n$3\r\nBar\r\n".Buffer());
Assert.True(channel.WriteInbound(buffer));
var message = channel.ReadInbound<ArrayRedisMessage>();
IList<IRedisMessage> children = message.Children;
IByteBuffer contentBuffer = ((FullBulkStringRedisMessage)children[0]).Content;
ReferenceCountUtil.Release(message);
Assert.Throws<IllegalReferenceCountException>(() => ReferenceCountUtil.Release(contentBuffer));
Assert.False(channel.Finish());
}
}
}

Просмотреть файл

@ -0,0 +1,221 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Codecs.Redis.Tests
{
using System.Collections.Generic;
using System.Linq;
using Transport.Channels.Embedded;
using DotNetty.Buffers;
using DotNetty.Codecs.Redis.Messages;
using Xunit;
public sealed class RedisEncoderTests
{
[Theory]
[InlineData("simple")]
public void EncodeSimpleString(string value)
{
var channel = new EmbeddedChannel(new RedisEncoder());
var message = new SimpleStringRedisMessage(value);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = $"+{value}\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Theory]
[InlineData("error1")]
public void EncodeError(string value)
{
var channel = new EmbeddedChannel(new RedisEncoder());
var message = new ErrorRedisMessage(value);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = $"-{value}\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Theory]
[InlineData(1234L)]
public void EncodeInteger(long value)
{
var channel = new EmbeddedChannel(new RedisEncoder());
var message = new IntegerRedisMessage(value);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = $":{value}\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Fact]
public void EncodeBulkStringContent()
{
var channel = new EmbeddedChannel(new RedisEncoder());
var header = new BulkStringHeaderRedisMessage(16);
IByteBuffer buffer1 = "bulk\nstr".Buffer();
buffer1.Retain();
var body1 = new BulkStringRedisContent(buffer1);
IByteBuffer buffer2 = "ing\ntest".Buffer();
buffer1.Retain();
var body2 = new LastBulkStringRedisContent(buffer2);
Assert.True(channel.WriteOutbound(header));
Assert.True(channel.WriteOutbound(body1));
Assert.True(channel.WriteOutbound(body2));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = "$16\r\nbulk\nstring\ntest\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Theory]
[InlineData(@"bulk\nstring\ntest")]
public void EncodeFullBulkString(string value)
{
var channel = new EmbeddedChannel(new RedisEncoder());
// Content
IByteBuffer bulkStringBuffer = value.Buffer();
bulkStringBuffer.Retain();
int length = bulkStringBuffer.ReadableBytes;
var message = new FullBulkStringRedisMessage(bulkStringBuffer);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = $"${length}\r\n{value}\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Fact]
public void EncodeSimpleArray()
{
var channel = new EmbeddedChannel(new RedisEncoder());
var messages = new List<IRedisMessage>();
IByteBuffer buffer = "foo".Buffer();
buffer.Retain();
messages.Add(new FullBulkStringRedisMessage(buffer));
buffer = "bar".Buffer();
buffer.Retain();
messages.Add(new FullBulkStringRedisMessage(buffer));
IRedisMessage message = new ArrayRedisMessage(messages);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = "*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Fact]
public void EncodeNullArray()
{
var channel = new EmbeddedChannel(new RedisEncoder());
IRedisMessage message = ArrayRedisMessage.Null;
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = "*-1\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Fact]
public void EncodeEmptyArray()
{
var channel = new EmbeddedChannel(new RedisEncoder());
IRedisMessage message = ArrayRedisMessage.Empty;
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = "*0\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
[Fact]
public void EncodeNestedArray()
{
var channel = new EmbeddedChannel(new RedisEncoder());
var grandChildren = new List<IRedisMessage>
{
new FullBulkStringRedisMessage("bar".Buffer()),
new IntegerRedisMessage(-1234L)
};
var children = new List<IRedisMessage>
{
new SimpleStringRedisMessage("foo"),
new ArrayRedisMessage(grandChildren)
};
IRedisMessage message = new ArrayRedisMessage(children);
Assert.True(channel.WriteOutbound(message));
IByteBuffer written = ReadAll(channel);
byte[] output = written.Bytes();
byte[] expected = "*2\r\n+foo\r\n*2\r\n$3\r\nbar\r\n:-1234\r\n".Bytes();
Assert.Equal(expected.Length, output.Length);
Assert.True(output.SequenceEqual(expected));
written.Release();
}
static IByteBuffer ReadAll(EmbeddedChannel channel)
{
Assert.NotNull(channel);
IByteBuffer buffer = Unpooled.Buffer();
IByteBuffer read;
while ((read = channel.ReadOutbound<IByteBuffer>()) != null)
{
buffer.WriteBytes(read);
}
return buffer;
}
}
}

Просмотреть файл

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="xunit" version="2.1.0" targetFramework="net45" />
<package id="xunit.abstractions" version="2.0.0" targetFramework="net45" />
<package id="xunit.assert" version="2.1.0" targetFramework="net45" />
<package id="xunit.core" version="2.1.0" targetFramework="net5" />
<package id="xunit.extensibility.core" version="2.1.0" targetFramework="net45" />
<package id="xunit.extensibility.execution" version="2.1.0" targetFramework="net45" />
</packages>