[Kogel.Subscribe.Mssql]SQL Server incremental subscription, database change monitoring
此框架是SQL Server增量订阅,To listen to add authorization database data changes
目前仅支持SQL Server,Nuget上可以下载安装
或者使用Nuget命令添加包
dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1(一)Define the need to monitor table entity class
/// <summary> /// /// </summary> [Display(Rename = "t_oms_order_detail")] [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")] public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int> { /// <summary> /// /// </summary> [Identity] [Display(Rename = "id")] [Nest.PropertyName("id")] public override int Id { get; set; } /// <summary> /// /// </summary> [Display(Rename = "name")] [Nest.PropertyName("name")] public string Name { get; set; } /// <summary> /// /// </summary> [Display(Rename = "trade_id")] [Nest.PropertyName("trade_id")] public int? TradeId { get; set; } /// <summary> /// /// </summary> [Display(Rename = "descption")] [Nest.PropertyName("descption")] public string Descption { get; set; } /// <summary> /// /// </summary> [Display(Rename = "create_time")] [Nest.PropertyName("create_time")] public DateTime CreateTime { get; set; } }
[Display]和[Identity]属于Kogel.Dapper.ExtensionThe characteristics of if[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,If not you can ignore
(二)Define table subscription/// <summary> /// Define table subscription /// </summary> public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail> { /// <summary> /// 设置连接配置 /// </summary> /// <param name="builder"></param> public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder) { //This connection string account need to have administrator rights builder.BuildConnection("数据库连接字符串"); } }
If you need this table corresponding to more than one table can be set up
//Configure all table fragmentation builder.BuildShards(new List<string> { "t_oms_order_detail_1", "t_oms_order_detail_2", "t_oms_order_detail_3" })
(1).If you want to push subscription toRabbitMQ中
builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" })
可以通过BuildTopic设置交换机名称
builder.BuildTopic("kogel_subscribe_order_detail")
(2).If you want to push subscription toKafka中
builder.BuildKafka(new ProducerConfig { BootstrapServers = "localhost:9092", Acks = Acks.None })
可以通过BuildTopic设置Topic名称
builder.BuildTopic("kogel_subscribe_order_detail")
(3).If you want to push subscription toElasticsearch中
builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")), })
如果有设置Basic授权
builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")) .BasicAuthentication("账号","密码") })
If you want to according to their own definition of fragmentation logic inserted into multipleESThe index can beWriteInterceptor
/// <summary> /// 设置连接配置 /// </summary> /// <param name="builder"></param> public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder) { //This connection string account need to have administrator rights builder.BuildConnection("数据库连接字符串"); //Define the pushES builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")) .BasicAuthentication("账号", "密码"), WriteInterceptor = message => WriteInterceptor(message) }); } /// <summary> /// Define your own index logical /// </summary> /// <param name="messages"></param> /// <returns></returns> private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message) { string esIndexName; //Write their index fragmentation of business logic in here if (message.Result.Id % 3 == 0) { esIndexName = $"kogel_orders_2"; } else { esIndexName = $"kogel_orders_1"; } return message.ToEsSubscribeMessage(esIndexName); }
并且ESThe index does not exist when will dynamically create
(4).If you want to custom for subscription logic,在可以SubscribeThe subscription class rewritten
/// <summary> /// 订阅变更 (每一次sqlThe execution of would trigger aSubscribe) /// </summary> /// <param name="messageList">The message list said all affect the data changes(会受BuildLimit限制,No query to complete in the next found)</param> public override void Subscribes(List<SubscribeMessage<T>> messageList) { foreach (var message in messageList) { Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}"); } }
The above subscription priority:
(三)Subscribe to startStart listening to all inherit fromSubscribe<T>的类,Execution when the application starts
ApplicationProgram.Run();
如果是基础BaseSubscribe<T>Intermediate base class needs to be defined asabstract,例如
/// <summary> /// Basic configuration class needs to be defined asabstract /// </summary> /// <typeparam name="T"></typeparam> public abstract class BaseSubscribe<T> : Subscribe<T> where T : class, IBaseEntity { }
关闭监听,Execute when the application exits
ApplicationProgram.Close();(四)其他配置
builder.BuildCdcConfig(new CdcConfig { //扫描间隔(Every time the interval of scanning change table,单位毫秒) 默认10000毫秒/10秒 ScanInterval = 10000, //Change the capture file inDB保存的时间(The default for three days) Retention = 60 * 24 * 3, //Whether the first scan table data to monitor all changing(默认false) IsFirstScanFull = false, //Every time to retrieve the amount of change(默认10条) Limit = 10, //Change the scanning the offset position(The default from the last stop start) OffsetPosition = OffsetPositionEnum.Abort })
框架开源,完整框架源码可以去Github上下载:
https://github.com/a935368322/Kogel.Subscribe.Mssql
如有问题也可以加QQ群讨论:
技术群 710217654