diff --git a/go.mod b/go.mod index 8e7ce2f..2aa8f8d 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,13 @@ module rebeam -go 1.17 +go 1.20 require ( - github.com/go-redis/redis/v8 v8.11.4 + github.com/go-redis/redis/v8 v8.11.5 github.com/nsqio/go-diskqueue v1.1.0 ) require ( - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go.sum b/go.sum index 0d0b942..4828a8f 100644 --- a/go.sum +++ b/go.sum @@ -1,99 +1,19 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= -github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q= github.com/nsqio/go-diskqueue v1.1.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= -github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go index 0349f5a..939d597 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "net/http" _ "net/http/pprof" "os" @@ -27,6 +28,13 @@ func l(_ dq.LogLevel, f string, args ...interface{}) { log.Printf(f, args...) } +type Core struct { + DataDir string + MainClient *redis.Client + Offloaders map[string]*Offloader + MigrationMode bool +} + type ProjectRedisConfig struct { Host string `json:"host"` Pass string `json:"pass"` @@ -53,6 +61,7 @@ type Offloader struct { Queues map[string]dq.Interface Sets map[string]string Name string + Core *Core } func (that *Offloader) CleanName(s string) string { @@ -108,7 +117,7 @@ func (that *Offloader) RefreshQueues() error { needQueueMap[queueName] = true if _, has := that.Queues[queueName]; !has { log.Printf("opening queue %s for %s:%s", queueName, that.Name, setName) - queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), dataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l) + queue := dq.New(fmt.Sprintf("%s:%s", that.Name, that.CleanName(queueName)), that.Core.DataDir, 128*1024*1024, 0, 128*1024*1024, 1_000_000, 5*time.Second, l) if queue == nil { return fmt.Errorf("unable to open disk queue %s:%s (dq.New()==nil)", that.Name, that.CleanName(queueName)) } @@ -138,7 +147,10 @@ func (that *Offloader) UpdateStats() { if k != q { continue } - hset[k] = fmt.Sprintf("%d", that.Queues[q].Depth()) + queue := that.Queues[q] + if queue != nil { + hset[k] = fmt.Sprintf("%d", queue.Depth()) + } } _, err := that.RedisClient.HSet(that.Context, fmt.Sprintf("%s:offloaded", that.Name), hset).Result() if err != nil { @@ -273,7 +285,7 @@ func (that *Offloader) Do() { log.Printf("unable to scard %s: %s", key, err) continue } - if scard > watermarkHigh || (k != q && scard > 0) { + if !that.Core.MigrationMode && (scard > watermarkHigh || (k != q && scard > 0)) { spopLimit := scard - watermarkHigh if k != q || spopLimit > batchSize { spopLimit = batchSize @@ -295,9 +307,9 @@ func (that *Offloader) Do() { } } rerun = true - } else if k == q && scard < watermarkLow && that.Queues[q].Depth() > 0 { + } else if k == q && ((that.Core.MigrationMode && scard < watermarkHigh*2) || scard < watermarkLow) && that.Queues[q].Depth() > 0 { spopLimit := watermarkLow - scard - if spopLimit > batchSize { + if spopLimit > batchSize || that.Core.MigrationMode { spopLimit = batchSize } var entries []interface{} @@ -350,11 +362,9 @@ func (that *Offloader) Do() { } } -var offloaders = map[string]*Offloader{} - -func StopProjects() { +func (that *Core) StopProjects() { var doneChans []chan bool - for project, offloader := range offloaders { + for project, offloader := range that.Offloaders { log.Printf("stopping offloader %s", project) offloader.Cancel() doneChans = append(doneChans, offloader.Done) @@ -364,7 +374,7 @@ func StopProjects() { } } -func RefreshProjects(redisClient *redis.Client) { +func (that *Core) RefreshProjects(redisClient *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) res, err := redisClient.HGetAll(ctx, "trackers").Result() cancel() @@ -385,7 +395,7 @@ func RefreshProjects(redisClient *redis.Client) { } updatedProjects[project] = config } - for project, offloader := range offloaders { + for project, offloader := range that.Offloaders { _, stopRequired := updatedProjects[project] stopRequired = !stopRequired if !stopRequired { @@ -407,15 +417,16 @@ func RefreshProjects(redisClient *redis.Client) { log.Printf("stopping offloader %s", project) offloader.Cancel() <-offloader.Done - delete(offloaders, project) + delete(that.Offloaders, project) } } for project, config := range updatedProjects { - if _, has := offloaders[project]; !has { + if _, has := that.Offloaders[project]; !has { log.Printf("starting offloader %s", project) offloader := &Offloader{} offloader.Name = project offloader.ProjectConfig = config + offloader.Core = that if config.RedisConfig != nil { offloader.RedisClient = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", config.RedisConfig.Host, config.RedisConfig.Port), @@ -428,40 +439,61 @@ func RefreshProjects(redisClient *redis.Client) { } offloader.Context, offloader.Cancel = context.WithCancel(context.Background()) offloader.Done = make(chan bool) - offloaders[project] = offloader + that.Offloaders[project] = offloader go offloader.Do() } } } -var dataDir string - func main() { + rand.Seed(time.Now().UnixNano()) log.SetFlags(log.Flags() | log.Lshortfile) go func() { - if err := http.ListenAndServe("127.0.0.1:16992", nil); err != nil { - log.Printf("webserver error: %s", err) + var err error + for i := 0; i < 10; i++ { + port := rand.Intn(65535-1024) + 1024 + err = http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil) + if err != nil && err != http.ErrServerClosed { + log.Printf("unable to listen on port %d: %s", port, err) + continue + } + break } }() - dataDir = os.Getenv("DATA_DIR") + + dataDir := os.Getenv("DATA_DIR") if dataDir == "" { log.Panicf("no DATA_DIR specified") } + // check if dataDir exists and is a directory + if stat, err := os.Stat(dataDir); os.IsNotExist(err) { + log.Panicf("DATA_DIR %s does not exist", dataDir) + } else if !stat.IsDir() { + log.Panicf("DATA_DIR %s is not a directory", dataDir) + } + mainOptions, err := redis.ParseURL(os.Getenv("REDIS_URL")) if err != nil { log.Panicf("%s", err) } mainOptions.ReadTimeout = 15 * time.Minute mainClient := redis.NewClient(mainOptions) + sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) ticker := time.NewTicker(1 * time.Minute) - defer StopProjects() + core := &Core{ + DataDir: dataDir, + MainClient: mainClient, + Offloaders: map[string]*Offloader{}, + MigrationMode: os.Getenv("MIGRATION_MODE") != "", + } + defer core.StopProjects() for { - RefreshProjects(mainClient) + core.RefreshProjects(mainClient) select { case <-sc: - StopProjects() + core.StopProjects() return case <-ticker.C: } diff --git a/vendor/github.com/go-redis/redis/v8/.golangci.yml b/vendor/github.com/go-redis/redis/v8/.golangci.yml index 53ce42a..de51455 100644 --- a/vendor/github.com/go-redis/redis/v8/.golangci.yml +++ b/vendor/github.com/go-redis/redis/v8/.golangci.yml @@ -2,26 +2,3 @@ run: concurrency: 8 deadline: 5m tests: false -linters: - enable-all: true - disable: - - funlen - - gochecknoglobals - - gochecknoinits - - gocognit - - goconst - - godox - - gosec - - maligned - - wsl - - gomnd - - goerr113 - - exhaustive - - nestif - - nlreturn - - exhaustivestruct - - wrapcheck - - errorlint - - cyclop - - forcetypeassert - - forbidigo diff --git a/vendor/github.com/go-redis/redis/v8/CHANGELOG.md b/vendor/github.com/go-redis/redis/v8/CHANGELOG.md index c575c56..195e519 100644 --- a/vendor/github.com/go-redis/redis/v8/CHANGELOG.md +++ b/vendor/github.com/go-redis/redis/v8/CHANGELOG.md @@ -1,3 +1,31 @@ +## [8.11.5](https://github.com/go-redis/redis/compare/v8.11.4...v8.11.5) (2022-03-17) + + +### Bug Fixes + +* add missing Expire methods to Cmdable ([17e3b43](https://github.com/go-redis/redis/commit/17e3b43879d516437ada71cf9c0deac6a382ed9a)) +* add whitespace for avoid unlikely colisions ([7f7c181](https://github.com/go-redis/redis/commit/7f7c1817617cfec909efb13d14ad22ef05a6ad4c)) +* example/otel compile error ([#2028](https://github.com/go-redis/redis/issues/2028)) ([187c07c](https://github.com/go-redis/redis/commit/187c07c41bf68dc3ab280bc3a925e960bbef6475)) +* **extra/redisotel:** set span.kind attribute to client ([065b200](https://github.com/go-redis/redis/commit/065b200070b41e6e949710b4f9e01b50ccc60ab2)) +* format ([96f53a0](https://github.com/go-redis/redis/commit/96f53a0159a28affa94beec1543a62234e7f8b32)) +* invalid type assert in stringArg ([de6c131](https://github.com/go-redis/redis/commit/de6c131865b8263400c8491777b295035f2408e4)) +* rename Golang to Go ([#2030](https://github.com/go-redis/redis/issues/2030)) ([b82a2d9](https://github.com/go-redis/redis/commit/b82a2d9d4d2de7b7cbe8fcd4895be62dbcacacbc)) +* set timeout for WAIT command. Fixes [#1963](https://github.com/go-redis/redis/issues/1963) ([333fee1](https://github.com/go-redis/redis/commit/333fee1a8fd98a2fbff1ab187c1b03246a7eb01f)) +* update some argument counts in pre-allocs ([f6974eb](https://github.com/go-redis/redis/commit/f6974ebb5c40a8adf90d2cacab6dc297f4eba4c2)) + + +### Features + +* Add redis v7's NX, XX, GT, LT expire variants ([e19bbb2](https://github.com/go-redis/redis/commit/e19bbb26e2e395c6e077b48d80d79e99f729a8b8)) +* add support for acl sentinel auth in universal client ([ab0ccc4](https://github.com/go-redis/redis/commit/ab0ccc47413f9b2a6eabc852fed5005a3ee1af6e)) +* add support for COPY command ([#2016](https://github.com/go-redis/redis/issues/2016)) ([730afbc](https://github.com/go-redis/redis/commit/730afbcffb93760e8a36cc06cfe55ab102b693a7)) +* add support for passing extra attributes added to spans ([39faaa1](https://github.com/go-redis/redis/commit/39faaa171523834ba527c9789710c4fde87f5a2e)) +* add support for time.Duration write and scan ([2f1b74e](https://github.com/go-redis/redis/commit/2f1b74e20cdd7719b2aecf0768d3e3ae7c3e781b)) +* **redisotel:** ability to override TracerProvider ([#1998](https://github.com/go-redis/redis/issues/1998)) ([bf8d4aa](https://github.com/go-redis/redis/commit/bf8d4aa60c00366cda2e98c3ddddc8cf68507417)) +* set net.peer.name and net.peer.port in otel example ([69bf454](https://github.com/go-redis/redis/commit/69bf454f706204211cd34835f76b2e8192d3766d)) + + + ## [8.11.4](https://github.com/go-redis/redis/compare/v8.11.3...v8.11.4) (2021-10-04) diff --git a/vendor/github.com/go-redis/redis/v8/README.md b/vendor/github.com/go-redis/redis/v8/README.md index 9c27241..f3b6a01 100644 --- a/vendor/github.com/go-redis/redis/v8/README.md +++ b/vendor/github.com/go-redis/redis/v8/README.md @@ -1,19 +1,16 @@ -

- - All-in-one tool to optimize performance and monitor errors & logs - -

- -# Redis client for Golang +# Redis client for Go ![build workflow](https://github.com/go-redis/redis/actions/workflows/build.yml/badge.svg) [![PkgGoDev](https://pkg.go.dev/badge/github.com/go-redis/redis/v8)](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) [![Documentation](https://img.shields.io/badge/redis-documentation-informational)](https://redis.uptrace.dev/) -[![Chat](https://discordapp.com/api/guilds/752070105847955518/widget.png)](https://discord.gg/rWtp5Aj) -- To ask questions, join [Discord](https://discord.gg/rWtp5Aj) or use - [Discussions](https://github.com/go-redis/redis/discussions). -- [Newsletter](https://blog.uptrace.dev/pages/newsletter.html) to get latest updates. +go-redis is brought to you by :star: [**uptrace/uptrace**](https://github.com/uptrace/uptrace). +Uptrace is an open source and blazingly fast **distributed tracing** backend powered by +OpenTelemetry and ClickHouse. Give it a star as well! + +## Resources + +- [Discussions](https://github.com/go-redis/redis/discussions) - [Documentation](https://redis.uptrace.dev) - [Reference](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc) - [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples) @@ -22,15 +19,14 @@ Other projects you may like: - [Bun](https://bun.uptrace.dev) - fast and simple SQL client for PostgreSQL, MySQL, and SQLite. -- [treemux](https://github.com/vmihailenco/treemux) - high-speed, flexible, tree-based HTTP router - for Go. +- [BunRouter](https://bunrouter.uptrace.dev/) - fast and flexible HTTP router for Go. ## Ecosystem -- [Redis Mock](https://github.com/go-redis/redismock). -- [Distributed Locks](https://github.com/bsm/redislock). -- [Redis Cache](https://github.com/go-redis/cache). -- [Rate limiting](https://github.com/go-redis/redis_rate). +- [Redis Mock](https://github.com/go-redis/redismock) +- [Distributed Locks](https://github.com/bsm/redislock) +- [Redis Cache](https://github.com/go-redis/cache) +- [Rate limiting](https://github.com/go-redis/redis_rate) ## Features @@ -39,16 +35,16 @@ Other projects you may like: [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. - [Pub/Sub](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#PubSub). - [Transactions](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-Client-TxPipeline). -- [Pipeline](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-Client-Pipeline) and - [TxPipeline](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-Client-TxPipeline). +- [Pipeline](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-Client.Pipeline) and + [TxPipeline](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-Client.TxPipeline). - [Scripting](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#Script). - [Timeouts](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#Options). - [Redis Sentinel](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#NewFailoverClient). - [Redis Cluster](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#NewClusterClient). -- [Cluster of Redis Servers](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-NewClusterClient--ManualSetup) +- [Cluster of Redis Servers](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-NewClusterClient-ManualSetup) without using cluster mode and Redis Sentinel. - [Ring](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#NewRing). -- [Instrumentation](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#ex-package--Instrumentation). +- [Instrumentation](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#example-package-Instrumentation). ## Installation @@ -72,6 +68,7 @@ go get github.com/go-redis/redis/v8 import ( "context" "github.com/go-redis/redis/v8" + "fmt" ) var ctx = context.Background() diff --git a/vendor/github.com/go-redis/redis/v8/command.go b/vendor/github.com/go-redis/redis/v8/command.go index 0079b59..4bb12a8 100644 --- a/vendor/github.com/go-redis/redis/v8/command.go +++ b/vendor/github.com/go-redis/redis/v8/command.go @@ -20,7 +20,7 @@ type Cmder interface { String() string stringArg(int) string firstKeyPos() int8 - setFirstKeyPos(int8) + SetFirstKeyPos(int8) readTimeout() *time.Duration readReply(rd *proto.Reader) error @@ -151,15 +151,21 @@ func (cmd *baseCmd) stringArg(pos int) string { if pos < 0 || pos >= len(cmd.args) { return "" } - s, _ := cmd.args[pos].(string) - return s + arg := cmd.args[pos] + switch v := arg.(type) { + case string: + return v + default: + // TODO: consider using appendArg + return fmt.Sprint(v) + } } func (cmd *baseCmd) firstKeyPos() int8 { return cmd.keyPos } -func (cmd *baseCmd) setFirstKeyPos(keyPos int8) { +func (cmd *baseCmd) SetFirstKeyPos(keyPos int8) { cmd.keyPos = keyPos } diff --git a/vendor/github.com/go-redis/redis/v8/commands.go b/vendor/github.com/go-redis/redis/v8/commands.go index c76242d..bbfe089 100644 --- a/vendor/github.com/go-redis/redis/v8/commands.go +++ b/vendor/github.com/go-redis/redis/v8/commands.go @@ -96,6 +96,10 @@ type Cmdable interface { Exists(ctx context.Context, keys ...string) *IntCmd Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd ExpireAt(ctx context.Context, key string, tm time.Time) *BoolCmd + ExpireNX(ctx context.Context, key string, expiration time.Duration) *BoolCmd + ExpireXX(ctx context.Context, key string, expiration time.Duration) *BoolCmd + ExpireGT(ctx context.Context, key string, expiration time.Duration) *BoolCmd + ExpireLT(ctx context.Context, key string, expiration time.Duration) *BoolCmd Keys(ctx context.Context, pattern string) *StringSliceCmd Migrate(ctx context.Context, host, port, key string, db int, timeout time.Duration) *StatusCmd Move(ctx context.Context, key string, db int) *BoolCmd @@ -139,6 +143,7 @@ type Cmdable interface { SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd StrLen(ctx context.Context, key string) *IntCmd + Copy(ctx context.Context, sourceKey string, destKey string, db int, replace bool) *IntCmd GetBit(ctx context.Context, key string, offset int64) *IntCmd SetBit(ctx context.Context, key string, offset int64, value int) *IntCmd @@ -431,6 +436,7 @@ func (c statefulCmdable) AuthACL(ctx context.Context, username, password string) func (c cmdable) Wait(ctx context.Context, numSlaves int, timeout time.Duration) *IntCmd { cmd := NewIntCmd(ctx, "wait", numSlaves, int(timeout/time.Millisecond)) + cmd.setReadTimeout(timeout) _ = c(ctx, cmd) return cmd } @@ -525,7 +531,37 @@ func (c cmdable) Exists(ctx context.Context, keys ...string) *IntCmd { } func (c cmdable) Expire(ctx context.Context, key string, expiration time.Duration) *BoolCmd { - cmd := NewBoolCmd(ctx, "expire", key, formatSec(ctx, expiration)) + return c.expire(ctx, key, expiration, "") +} + +func (c cmdable) ExpireNX(ctx context.Context, key string, expiration time.Duration) *BoolCmd { + return c.expire(ctx, key, expiration, "NX") +} + +func (c cmdable) ExpireXX(ctx context.Context, key string, expiration time.Duration) *BoolCmd { + return c.expire(ctx, key, expiration, "XX") +} + +func (c cmdable) ExpireGT(ctx context.Context, key string, expiration time.Duration) *BoolCmd { + return c.expire(ctx, key, expiration, "GT") +} + +func (c cmdable) ExpireLT(ctx context.Context, key string, expiration time.Duration) *BoolCmd { + return c.expire(ctx, key, expiration, "LT") +} + +func (c cmdable) expire( + ctx context.Context, key string, expiration time.Duration, mode string, +) *BoolCmd { + args := make([]interface{}, 3, 4) + args[0] = "expire" + args[1] = key + args[2] = formatSec(ctx, expiration) + if mode != "" { + args = append(args, mode) + } + + cmd := NewBoolCmd(ctx, args...) _ = c(ctx, cmd) return cmd } @@ -990,6 +1026,16 @@ func (c cmdable) StrLen(ctx context.Context, key string) *IntCmd { return cmd } +func (c cmdable) Copy(ctx context.Context, sourceKey, destKey string, db int, replace bool) *IntCmd { + args := []interface{}{"copy", sourceKey, destKey, "DB", db} + if replace { + args = append(args, "REPLACE") + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ func (c cmdable) GetBit(ctx context.Context, key string, offset int64) *IntCmd { @@ -1778,7 +1824,7 @@ type XReadArgs struct { } func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 5+len(a.Streams)) + args := make([]interface{}, 0, 6+len(a.Streams)) args = append(args, "xread") keyPos := int8(1) @@ -1802,7 +1848,7 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { if a.Block >= 0 { cmd.setReadTimeout(a.Block) } - cmd.setFirstKeyPos(keyPos) + cmd.SetFirstKeyPos(keyPos) _ = c(ctx, cmd) return cmd } @@ -1860,7 +1906,7 @@ type XReadGroupArgs struct { } func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 8+len(a.Streams)) + args := make([]interface{}, 0, 10+len(a.Streams)) args = append(args, "xreadgroup", "group", a.Group, a.Consumer) keyPos := int8(4) @@ -1886,7 +1932,7 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic if a.Block >= 0 { cmd.setReadTimeout(a.Block) } - cmd.setFirstKeyPos(keyPos) + cmd.SetFirstKeyPos(keyPos) _ = c(ctx, cmd) return cmd } @@ -1957,7 +2003,7 @@ func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAuto } func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} { - args := make([]interface{}, 0, 9) + args := make([]interface{}, 0, 8) args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start) if a.Count > 0 { args = append(args, "count", a.Count) @@ -1989,7 +2035,7 @@ func (c cmdable) XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCm } func xClaimArgs(a *XClaimArgs) []interface{} { - args := make([]interface{}, 0, 4+len(a.Messages)) + args := make([]interface{}, 0, 5+len(a.Messages)) args = append(args, "xclaim", a.Stream, @@ -2362,7 +2408,7 @@ func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZSt args = append(args, "zinterstore", destination, len(store.Keys)) args = store.appendArgs(args) cmd := NewIntCmd(ctx, args...) - cmd.setFirstKeyPos(3) + cmd.SetFirstKeyPos(3) _ = c(ctx, cmd) return cmd } @@ -2372,7 +2418,7 @@ func (c cmdable) ZInter(ctx context.Context, store *ZStore) *StringSliceCmd { args = append(args, "zinter", len(store.Keys)) args = store.appendArgs(args) cmd := NewStringSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2383,7 +2429,7 @@ func (c cmdable) ZInterWithScores(ctx context.Context, store *ZStore) *ZSliceCmd args = store.appendArgs(args) args = append(args, "withscores") cmd := NewZSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2710,7 +2756,7 @@ func (c cmdable) ZUnion(ctx context.Context, store ZStore) *StringSliceCmd { args = append(args, "zunion", len(store.Keys)) args = store.appendArgs(args) cmd := NewStringSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2721,7 +2767,7 @@ func (c cmdable) ZUnionWithScores(ctx context.Context, store ZStore) *ZSliceCmd args = store.appendArgs(args) args = append(args, "withscores") cmd := NewZSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2731,7 +2777,7 @@ func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *I args = append(args, "zunionstore", dest, len(store.Keys)) args = store.appendArgs(args) cmd := NewIntCmd(ctx, args...) - cmd.setFirstKeyPos(3) + cmd.SetFirstKeyPos(3) _ = c(ctx, cmd) return cmd } @@ -2761,7 +2807,7 @@ func (c cmdable) ZDiff(ctx context.Context, keys ...string) *StringSliceCmd { } cmd := NewStringSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2777,7 +2823,7 @@ func (c cmdable) ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd args[len(keys)+2] = "withscores" cmd := NewZSliceCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -3053,7 +3099,7 @@ func (c cmdable) MemoryUsage(ctx context.Context, key string, samples ...int) *I args = append(args, "SAMPLES", samples[0]) } cmd := NewIntCmd(ctx, args...) - cmd.setFirstKeyPos(2) + cmd.SetFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -3070,7 +3116,7 @@ func (c cmdable) Eval(ctx context.Context, script string, keys []string, args .. } cmdArgs = appendArgs(cmdArgs, args) cmd := NewCmd(ctx, cmdArgs...) - cmd.setFirstKeyPos(3) + cmd.SetFirstKeyPos(3) _ = c(ctx, cmd) return cmd } @@ -3085,7 +3131,7 @@ func (c cmdable) EvalSha(ctx context.Context, sha1 string, keys []string, args . } cmdArgs = appendArgs(cmdArgs, args) cmd := NewCmd(ctx, cmdArgs...) - cmd.setFirstKeyPos(3) + cmd.SetFirstKeyPos(3) _ = c(ctx, cmd) return cmd } diff --git a/vendor/github.com/go-redis/redis/v8/error.go b/vendor/github.com/go-redis/redis/v8/error.go index 5025215..521594b 100644 --- a/vendor/github.com/go-redis/redis/v8/error.go +++ b/vendor/github.com/go-redis/redis/v8/error.go @@ -134,7 +134,7 @@ func isMovedSameConnAddr(err error, addr string) bool { if !strings.HasPrefix(redisError, "MOVED ") { return false } - return strings.HasSuffix(redisError, addr) + return strings.HasSuffix(redisError, " "+addr) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/go-redis/redis/v8/internal/proto/scan.go b/vendor/github.com/go-redis/redis/v8/internal/proto/scan.go index 7d7183c..0e99476 100644 --- a/vendor/github.com/go-redis/redis/v8/internal/proto/scan.go +++ b/vendor/github.com/go-redis/redis/v8/internal/proto/scan.go @@ -10,6 +10,7 @@ import ( ) // Scan parses bytes `b` to `v` with appropriate type. +//nolint:gocyclo func Scan(b []byte, v interface{}) error { switch v := v.(type) { case nil: @@ -105,6 +106,13 @@ func Scan(b []byte, v interface{}) error { var err error *v, err = time.Parse(time.RFC3339Nano, util.BytesToString(b)) return err + case *time.Duration: + n, err := util.ParseInt(b, 10, 64) + if err != nil { + return err + } + *v = time.Duration(n) + return nil case encoding.BinaryUnmarshaler: return v.UnmarshalBinary(b) default: diff --git a/vendor/github.com/go-redis/redis/v8/internal/proto/writer.go b/vendor/github.com/go-redis/redis/v8/internal/proto/writer.go index 81b09b8..c426098 100644 --- a/vendor/github.com/go-redis/redis/v8/internal/proto/writer.go +++ b/vendor/github.com/go-redis/redis/v8/internal/proto/writer.go @@ -98,6 +98,8 @@ func (w *Writer) WriteArg(v interface{}) error { case time.Time: w.numBuf = v.AppendFormat(w.numBuf[:0], time.RFC3339Nano) return w.bytes(w.numBuf) + case time.Duration: + return w.int(v.Nanoseconds()) case encoding.BinaryMarshaler: b, err := v.MarshalBinary() if err != nil { diff --git a/vendor/github.com/go-redis/redis/v8/package.json b/vendor/github.com/go-redis/redis/v8/package.json index dae7b9a..e4ea4bb 100644 --- a/vendor/github.com/go-redis/redis/v8/package.json +++ b/vendor/github.com/go-redis/redis/v8/package.json @@ -1,6 +1,6 @@ { "name": "redis", - "version": "8.11.4", + "version": "8.11.5", "main": "index.js", "repository": "git@github.com:go-redis/redis.git", "author": "Vladimir Mihailenco ", diff --git a/vendor/github.com/go-redis/redis/v8/pipeline.go b/vendor/github.com/go-redis/redis/v8/pipeline.go index c6ec340..31bab97 100644 --- a/vendor/github.com/go-redis/redis/v8/pipeline.go +++ b/vendor/github.com/go-redis/redis/v8/pipeline.go @@ -24,6 +24,7 @@ type pipelineExecer func(context.Context, []Cmder) error // depends of your batch size and/or use TxPipeline. type Pipeliner interface { StatefulCmdable + Len() int Do(ctx context.Context, args ...interface{}) *Cmd Process(ctx context.Context, cmd Cmder) error Close() error @@ -53,6 +54,15 @@ func (c *Pipeline) init() { c.statefulCmdable = c.Process } +// Len returns the number of queued commands. +func (c *Pipeline) Len() int { + c.mu.Lock() + ln := len(c.cmds) + c.mu.Unlock() + return ln +} + +// Do queues the custom command for later execution. func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd { cmd := NewCmd(ctx, args...) _ = c.Process(ctx, cmd) diff --git a/vendor/github.com/go-redis/redis/v8/ring.go b/vendor/github.com/go-redis/redis/v8/ring.go index 11a2037..4df00fc 100644 --- a/vendor/github.com/go-redis/redis/v8/ring.go +++ b/vendor/github.com/go-redis/redis/v8/ring.go @@ -576,7 +576,7 @@ func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo { } info := cmdsInfo[name] if info == nil { - internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name) + internal.Logger.Printf(ctx, "info for cmd=%s not found", name) } return info } diff --git a/vendor/github.com/go-redis/redis/v8/universal.go b/vendor/github.com/go-redis/redis/v8/universal.go index 1e962ab..c89b3e5 100644 --- a/vendor/github.com/go-redis/redis/v8/universal.go +++ b/vendor/github.com/go-redis/redis/v8/universal.go @@ -25,6 +25,7 @@ type UniversalOptions struct { Username string Password string + SentinelUsername string SentinelPassword string MaxRetries int @@ -114,6 +115,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions { DB: o.DB, Username: o.Username, Password: o.Password, + SentinelUsername: o.SentinelUsername, SentinelPassword: o.SentinelPassword, MaxRetries: o.MaxRetries, diff --git a/vendor/github.com/go-redis/redis/v8/version.go b/vendor/github.com/go-redis/redis/v8/version.go index ed47903..112c9a2 100644 --- a/vendor/github.com/go-redis/redis/v8/version.go +++ b/vendor/github.com/go-redis/redis/v8/version.go @@ -2,5 +2,5 @@ package redis // Version is the current release version. func Version() string { - return "8.11.4" + return "8.11.5" } diff --git a/vendor/modules.txt b/vendor/modules.txt index 1af6840..2b7a56f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -4,8 +4,8 @@ github.com/cespare/xxhash/v2 # github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f ## explicit github.com/dgryski/go-rendezvous -# github.com/go-redis/redis/v8 v8.11.4 -## explicit; go 1.13 +# github.com/go-redis/redis/v8 v8.11.5 +## explicit; go 1.17 github.com/go-redis/redis/v8 github.com/go-redis/redis/v8/internal github.com/go-redis/redis/v8/internal/hashtag