redis发布订阅
相比rabbitmq等专业消息队列的缺陷: 没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中。
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/gomodule/redigo/redis"
)
// ConsumeFunc consumes message at the channel.
type ConsumeFunc func(channel string, message []byte) error
// RedisClient represents a redis client with connection pool.
type RedisClient struct {
pool *redis.Pool
}
// NewRedisClient returns a RedisClient.
func NewRedisClient(addr string, passwd string) *RedisClient {
pool := &redis.Pool{
MaxIdle: 10,
IdleTimeout: 300 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(0))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
log.Printf("new redis pool at %s", addr)
client := &RedisClient{
pool: pool,
}
return client
}
// Close closes connection pool.
func (r *RedisClient) Close() error {
err := r.pool.Close()
return err
}
// Publish publishes message to channel.
func (r *RedisClient) Publish(channel, message string) (int, error) {
c := r.pool.Get()
defer c.Close()
n, err := redis.Int(c.Do("PUBLISH", channel, message))
if err != nil {
return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
}
return n, nil
}
// Subscribe subscribes messages at the channels.
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
psc := redis.PubSubConn{Conn: r.pool.Get()}
log.Printf("redis pubsub subscribe channel: %v", channel)
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
return err
}
done := make(chan error, 1)
// start a new goroutine to receive message
go func() {
defer psc.Close()
for {
switch msg := psc.Receive().(type) {
case error:
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
if err := consume(msg.Channel, msg.Data); err != nil {
done <- err
return
}
case redis.Subscription:
if msg.Count == 0 {
// all channels are unsubscribed
done <- nil
return
}
}
}
}()
ch <- 0
// health check
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
if err := psc.Unsubscribe(); err != nil {
return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
}
return nil
case err := <-done:
return err
case <-tick.C:
if err := psc.Ping(""); err != nil {
return err
}
log.Println("over")
return nil
}
}
}
func myConsumer(channel string, message []byte) error {
log.Printf("receive message[%s] at the channel[%s]\n", string(message), channel)
return nil
}
// ch 用于保证发布线程在订阅线程启动成功后才开始发布消息
var ch = make(chan int)
func main() {
redisClient := NewRedisClient("127.0.0.1:6300", "zyhua1122")
defer redisClient.Close()
go func() {
var subscriber int
<-ch
for i := 0; i < 3; i++ {
subscriber, _ = redisClient.Publish("testx", "hello world"+strconv.Itoa(i))
log.Printf("there is %d subscriber.\n", subscriber)
}
}()
ctx, cancel := context.WithCancel(context.Background())
err := redisClient.Subscribe(ctx,
func(channel string, message []byte) error {
log.Printf("receive message[%s] at the channel[%s]\n", string(message), channel)
if string(message) == "goodbye" {
cancel()
}
return nil
},
"testx")
if err != nil {
fmt.Printf("get error: %v\n", err)
}
return
}