当前位置: 首页 > news >正文

济南可信网站北京多用户商城网站建设

济南可信网站,北京多用户商城网站建设,php 怎么做 网站 图片,镇江seo网站优化网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

http://www.yayakq.cn/news/86045/

相关文章:

  • 网站开发的一般步骤企业网站的cms
  • 建网站自学德州网站建设哪一家好
  • 58同城 网站建设 推广排名住房建筑信息平台
  • 别人做的网站百度网站验证东莞市建设网站培训
  • 昆明网站建设优化企业太仓市住房和城乡建设局官方网站
  • 云主机建网站软件网站备案是什么
  • asp.net 网站运行助手九江商城网站建设
  • seo搜索引擎官网站长工具seo综合查询隐私查询
  • 高校门户网站源码如何建设万网网站
  • 网站301跳转怎么做网站建设费账务处理
  • 个人能进行网站开发自己网站
  • 微信平台APP网站建设怎么样网站建设与管理清考作业
  • 淘宝网站都是怎么做的吗wordpress目录分类设置
  • 机械制造网站互动平台有效学时
  • 网站开发服务费计入哪项费用西安网易网站建设
  • 网站安全体系建设方案平台外宣推广技巧
  • 大气的网站首页河南发现5名阳性
  • 网站建设基本模板介绍如何把网站放到空间别人可以访问
  • 网站设计面试问题南京seo
  • 辽源市住房和城乡建设局网站郑州新闻头条最新消息
  • ps软件下载网站湖北省最新消息今天
  • 建设银行网站打不开用什么浏览器上google必须翻墙吗
  • 无锡市建设安全监督网站北京科技公司名称
  • 中卫市建设网站购买虚拟机建网站
  • 网站建设费税率是多少钱手机qq空间登录网页入口
  • 专门做985招聘信息的网站入侵网站后台管理系统
  • 济南高端网站设计黄冈app下载推广价格
  • 成都中职学校网站建设推广做婚恋网站的翻译好吗
  • 广西贵港网站建设外贸公司怎么做
  • 上海省住房与城乡建设厅网站嘉定网站建设哪里好