kafka搭建相关:https://www.jianshu.com/p/79e29f653836
1.向kafka写消息
安装nuget包 confluent.Kafka
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
while (true)
{
Console.WriteLine("请输入发送的内容");
string content = Console.ReadLine();
//kafka节点
string brokerList = " 192.168.101.13:9092";
await ConfluentKafka.PublishAsync(brokerList, "test", content);
}
}
}
public static async Task PublishAsync(string brokerList,string topicName,string content)
{
//生产者配置
var config = new ProducerConfig
{
BootstrapServers = brokerList, //kafka节点
Acks = Acks.None //ack机制,0不等服务器确认,1主节点确认返回ack,-1全部节点同步完返回ack
};
using (var producer = new ProducerBuilder<string, string>(config)
.Build())
{
try
{
//key要给值,根据key做负载均衡,不然如果多节点,key不给值会全部写有一个分区
var deliveryReport = await producer.
ProduceAsync(
topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
Console.WriteLine($"向kafka发送了数据: {deliveryReport.TopicPartitionOffset}");
}
catch(ProduceException<string, string> e)
{
Console.WriteLine($"向kafka发送数据失败:{e.Message}");
}
}
}
image.png
查看kafka可以化工具看到产生一个test的topic,下面只有一个分片,里面已经写进了一条数据,具体内容被编码了看不了
image.png
2.读kafka消息
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
//kafka节点
string brokerList = " 192.168.101.13:9092";
string topicName = "test";
ConfluentKafka.Consumer(brokerList, topicName, "G1");
await Task.CompletedTask;
}
}
public static void Consumer(string brokerList, string topicName, string groupId)
{
//消费配置
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = groupId,//客户端组id
EnableAutoCommit = true,//处理完自动提交
//Latest = 0,表示消费者消费启动之后的数据。。。启动之前的数据消费不到
//Earliest = 1,每次都从头开始消费没有消费过的数据
AutoOffsetReset = AutoOffsetReset.Earliest,
//代表数据超过了6000没有处理完业务,则把数据给其他消费端
// 一定要注意。。SessionTimeoutMs值一定要小于MaxPollIntervalMs
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 10000,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
//新加入消费者的时候调用
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
//消费者会影响在平衡分区,当同一个组新加入消费者时,分区会在分配
consumer.Subscribe(topicName);
try
{
//循环摘取模式
while(true)
{
try
{
var consumeResult = consumer.Consume();
//摘取到底了
if (consumeResult.IsPartitionEOF)
{
continue;
}
Console.WriteLine($": {consumeResult.TopicPartitionOffset}::{consumeResult.Message.Value}");
try
{
// 提交偏移量,数据已经处理完成了
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
image.png
image.png
网友评论