1
This commit is contained in:
255
iMES.Core/KafkaManager/Service/KafkaConsumer.cs
Normal file
255
iMES.Core/KafkaManager/Service/KafkaConsumer.cs
Normal file
@@ -0,0 +1,255 @@
|
||||
//using Confluent.Kafka;
|
||||
//using System;
|
||||
//using System.Collections.Generic;
|
||||
//using System.Text;
|
||||
//using System.Threading.Tasks;
|
||||
//using iMES.Core.Configuration;
|
||||
//using iMES.Core.Enums;
|
||||
//using iMES.Core.KafkaManager.IService;
|
||||
//using iMES.Core.Services;
|
||||
|
||||
//namespace iMES.Core.KafkaManager.Service
|
||||
//{
|
||||
// /// <summary>
|
||||
// /// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)
|
||||
// /// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式
|
||||
// /// </summary>
|
||||
// /// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
|
||||
// /// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
|
||||
// public class KafkaConsumer<TKey, TValue> : KafkaConfig, IKafkaConsumer<TKey, TValue>
|
||||
// {
|
||||
// /// <summary>
|
||||
// /// Kafka地址(包含端口号)
|
||||
// /// </summary>
|
||||
// public string Servers
|
||||
// {
|
||||
// get
|
||||
// {
|
||||
// return ConsumerConfig.BootstrapServers;
|
||||
// }
|
||||
// set
|
||||
// {
|
||||
// ConsumerConfig.BootstrapServers = value;
|
||||
// }
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 消费者群组
|
||||
// /// </summary>
|
||||
// public string GroupId
|
||||
// {
|
||||
// get
|
||||
// {
|
||||
// return ConsumerConfig.GroupId;
|
||||
// }
|
||||
// set
|
||||
// {
|
||||
// ConsumerConfig.GroupId = value;
|
||||
// }
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 自动提交 默认为 false
|
||||
// /// </summary>
|
||||
// public bool EnableAutoCommit
|
||||
// {
|
||||
// get
|
||||
// {
|
||||
// return ConsumerConfig.EnableAutoCommit ?? false;
|
||||
// }
|
||||
// set
|
||||
// {
|
||||
// ConsumerConfig.EnableAutoCommit = value;
|
||||
// }
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 订阅回调模式-消费(持续订阅)
|
||||
// /// </summary>
|
||||
// /// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
|
||||
// /// <param name="Topic">主题</param>
|
||||
// public void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic)
|
||||
// {
|
||||
// Task.Factory.StartNew(() =>
|
||||
// {
|
||||
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
|
||||
// //设置反序列化方式
|
||||
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
|
||||
// builder.SetErrorHandler((_, e) =>
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
|
||||
// }).SetStatisticsHandler((_, json) =>
|
||||
// {
|
||||
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
|
||||
// }).SetPartitionsAssignedHandler((c, partitions) =>
|
||||
// {
|
||||
// string partitionsStr = string.Join(", ", partitions);
|
||||
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
|
||||
// }).SetPartitionsRevokedHandler((c, partitions) =>
|
||||
// {
|
||||
// string partitionsStr = string.Join(", ", partitions);
|
||||
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
|
||||
// });
|
||||
// using var consumer = builder.Build();
|
||||
// consumer.Subscribe(Topic);
|
||||
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
|
||||
// {
|
||||
// ConsumeResult<TKey, TValue> result = null;
|
||||
// try
|
||||
// {
|
||||
// result = consumer.Consume();
|
||||
// if (result.IsPartitionEOF) continue;
|
||||
// if (Func(result))
|
||||
// {
|
||||
// if (!(bool)ConsumerConfig.EnableAutoCommit)
|
||||
// {
|
||||
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
|
||||
// consumer.Commit(result);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// catch (ConsumeException ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 批量订阅回调模式-消费(持续订阅)
|
||||
// /// </summary>
|
||||
// /// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
|
||||
// /// <param name="Topic">主题</param>
|
||||
// public void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics)
|
||||
// {
|
||||
// Task.Factory.StartNew(() =>
|
||||
// {
|
||||
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
|
||||
// //设置反序列化方式
|
||||
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
|
||||
// builder.SetErrorHandler((_, e) =>
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
|
||||
// }).SetStatisticsHandler((_, json) =>
|
||||
// {
|
||||
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
|
||||
// }).SetPartitionsAssignedHandler((c, partitions) =>
|
||||
// {
|
||||
// string partitionsStr = string.Join(", ", partitions);
|
||||
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
|
||||
// }).SetPartitionsRevokedHandler((c, partitions) =>
|
||||
// {
|
||||
// string partitionsStr = string.Join(", ", partitions);
|
||||
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
|
||||
// });
|
||||
// using var consumer = builder.Build();
|
||||
// consumer.Subscribe(Topics);
|
||||
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
|
||||
// {
|
||||
// ConsumeResult<TKey, TValue> result = null;
|
||||
// try
|
||||
// {
|
||||
// result = consumer.Consume();
|
||||
// if (result.IsPartitionEOF) continue;
|
||||
// if (Func(result))
|
||||
// {
|
||||
// if (!(bool)ConsumerConfig.EnableAutoCommit)
|
||||
// {
|
||||
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
|
||||
// consumer.Commit(result);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// catch (ConsumeException ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
|
||||
// /// </summary>
|
||||
// /// <param name="Topic">主题</param>
|
||||
// /// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
|
||||
// /// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
|
||||
// /// <returns>待消费数据</returns>
|
||||
// public List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
|
||||
// {
|
||||
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
|
||||
// //设置反序列化方式
|
||||
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
|
||||
// using var consumer = builder.Build();
|
||||
// consumer.Subscribe(Topic);
|
||||
// List<ConsumeResult<TKey, TValue>> Res = new List<ConsumeResult<TKey, TValue>>();
|
||||
// while (true)
|
||||
// {
|
||||
// try
|
||||
// {
|
||||
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
|
||||
// if (result == null) break;
|
||||
// else
|
||||
// {
|
||||
// Res.Add(result);
|
||||
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
|
||||
// consumer.Commit();
|
||||
// }
|
||||
// if (Res.Count > MaxRow) break;
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
// return Res;
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 单笔消费模式-单行消费
|
||||
// /// </summary>
|
||||
// /// <param name="Topic">主题</param>
|
||||
// /// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
|
||||
// /// <returns>待消费数据</returns>
|
||||
// public ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300)
|
||||
// {
|
||||
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
|
||||
// //设置反序列化方式
|
||||
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
|
||||
// using var consumer = builder.Build();
|
||||
// consumer.Subscribe(Topic);
|
||||
// try
|
||||
// {
|
||||
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
|
||||
// if (result != null)
|
||||
// {
|
||||
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
|
||||
// consumer.Commit();
|
||||
// }
|
||||
// return result;
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
|
||||
// return null;
|
||||
// }
|
||||
// }
|
||||
|
||||
// public void Dispose()
|
||||
// {
|
||||
// //if (_cache != null)
|
||||
// // _cache.Dispose();
|
||||
// GC.SuppressFinalize(this);
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
Reference in New Issue
Block a user