[Kogel.Subscribe.Mssql]SQL Server incremental subscription, database change monitoring

此框架是SQL Server增量订阅,To listen to add authorization database data changes

目前仅支持SQL Server,Nuget上可以下载安装

或者使用Nuget命令添加包

dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1
(一)Define the need to monitor table entity class
 /// <summary>
///
/// </summary>
[Display(Rename = "t_oms_order_detail")]
[ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")]
public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int>
{
/// <summary>
///
/// </summary>
 [Identity]
[Display(Rename = "id")]
[Nest.PropertyName("id")]
public override int Id { get; set; }
/// <summary>
///
/// </summary>
[Display(Rename = "name")]
[Nest.PropertyName("name")]
public string Name { get; set; }
/// <summary>
///
/// </summary>
[Display(Rename = "trade_id")]
[Nest.PropertyName("trade_id")]
public int? TradeId { get; set; }
/// <summary>
///
/// </summary>
[Display(Rename = "descption")]
[Nest.PropertyName("descption")]
public string Descption { get; set; }
/// <summary>
///
/// </summary>
[Display(Rename = "create_time")]
[Nest.PropertyName("create_time")]
public DateTime CreateTime { get; set; }
}

[Display]和[Identity]属于Kogel.Dapper.ExtensionThe characteristics of if[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,If not you can ignore

(二)Define table subscription
 /// <summary>
/// Define table subscription
/// </summary>
public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail>
{
/// <summary>
/// 设置连接配置
/// </summary>
/// <param name="builder"></param>
public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
{
//This connection string account need to have administrator rights
builder.BuildConnection("数据库连接字符串");
}
}

If you need this table corresponding to more than one table can be set up

//Configure all table fragmentation
builder.BuildShards(new List<string>
{
"t_oms_order_detail_1",
"t_oms_order_detail_2",
"t_oms_order_detail_3"
})

(1).If you want to push subscription toRabbitMQ中

builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
})

可以通过BuildTopic设置交换机名称

builder.BuildTopic("kogel_subscribe_order_detail")

(2).If you want to push subscription toKafka中

builder.BuildKafka(new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.None
})

可以通过BuildTopic设置Topic名称

builder.BuildTopic("kogel_subscribe_order_detail")

(3).If you want to push subscription toElasticsearch中

 builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
{
Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")),
})

如果有设置Basic授权

builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
{
Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
.BasicAuthentication("账号","密码")
})

If you want to according to their own definition of fragmentation logic inserted into multipleESThe index can beWriteInterceptor

/// <summary>
/// 设置连接配置
/// </summary>
/// <param name="builder"></param>
public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
{
//This connection string account need to have administrator rights
builder.BuildConnection("数据库连接字符串");
//Define the pushES
builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
{
Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
.BasicAuthentication("账号", "密码"),
WriteInterceptor = message => WriteInterceptor(message)
});
}
/// <summary>
/// Define your own index logical
/// </summary>
/// <param name="messages"></param>
/// <returns></returns>
private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message)
{
string esIndexName;
//Write their index fragmentation of business logic in here
if (message.Result.Id % 3 == 0)
{
esIndexName = $"kogel_orders_2";
}
else
{
esIndexName = $"kogel_orders_1";
}
return message.ToEsSubscribeMessage(esIndexName);
}

并且ESThe index does not exist when will dynamically create

(4).If you want to custom for subscription logic,在可以SubscribeThe subscription class rewritten

/// <summary>
/// 订阅变更 (每一次sqlThe execution of would trigger aSubscribe)
/// </summary>
/// <param name="messageList">The message list said all affect the data changes(会受BuildLimit限制,No query to complete in the next found)</param>
public override void Subscribes(List<SubscribeMessage<T>> messageList)
{
foreach (var message in messageList)
{
Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}");
}
}

The above subscription priority:

[Kogel.Subscribe.Mssql]SQL Server incremental subscription, database change monitoring

(三)Subscribe to start

Start listening to all inherit fromSubscribe<T>的类,Execution when the application starts

ApplicationProgram.Run();

如果是基础BaseSubscribe<T>Intermediate base class needs to be defined asabstract,例如

 /// <summary>
/// Basic configuration class needs to be defined asabstract
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class BaseSubscribe<T> : Subscribe<T>
where T : class, IBaseEntity
{
}

关闭监听,Execute when the application exits

ApplicationProgram.Close();
(四)其他配置
builder.BuildCdcConfig(new CdcConfig
{
//扫描间隔(Every time the interval of scanning change table,单位毫秒) 默认10000毫秒/10秒
ScanInterval = 10000,
//Change the capture file inDB保存的时间(The default for three days)
Retention = 60 * 24 * 3,
//Whether the first scan table data to monitor all changing(默认false)
IsFirstScanFull = false,
//Every time to retrieve the amount of change(默认10条)
Limit = 10,
//Change the scanning the offset position(The default from the last stop start)
OffsetPosition = OffsetPositionEnum.Abort
})

框架开源,完整框架源码可以去Github上下载:

https://github.com/a935368322/Kogel.Subscribe.Mssql

如有问题也可以加QQ群讨论:

技术群 710217654

你可能想看:
标签: 债券逆回购
分享给朋友: