From eee2d1c5465e4197201e90646f4e613f66a755e4 Mon Sep 17 00:00:00 2001 From: Junyu Liu Date: Fri, 22 Nov 2024 17:41:33 +0800 Subject: [PATCH] refactor(core): move storage package to core module (#813) * refactor(core): move storage package to core module * rename * lint --- app/chat/export.go | 5 +- app/chat/ls.go | 5 +- app/chat/users.go | 5 +- app/dl/dl.go | 19 +++--- app/dl/serve.go | 5 +- app/forward/forward.go | 5 +- app/internal/tctx/tctx.go | 8 +-- app/login/code.go | 2 +- app/login/desktop.go | 4 +- app/login/qr.go | 2 +- app/up/up.go | 5 +- cmd/chat.go | 8 +-- cmd/dl.go | 4 +- cmd/extension.go | 2 +- cmd/forward.go | 4 +- cmd/root.go | 7 ++- cmd/up.go | 4 +- core/storage/keygen/keygen.go | 25 ++++++++ core/storage/peers.go | 105 +++++++++++++++++++++++++++++++++ core/storage/session.go | 42 +++++++++++++ {pkg => core}/storage/state.go | 50 +++++++++------- core/storage/storage.go | 15 +++++ pkg/key/key.go | 53 +---------------- pkg/kv/bolt.go | 3 +- pkg/kv/file.go | 12 ++-- pkg/kv/kv.go | 12 +--- pkg/kv/kv_test.go | 5 +- pkg/kv/legacy.go | 12 ++-- pkg/storage/peers.go | 94 ----------------------------- pkg/storage/session.go | 39 ------------ pkg/tclient/tclient.go | 9 ++- pkg/tmessage/files.go | 9 ++- pkg/tmessage/urls.go | 5 +- test/testserver/testserver.go | 2 +- 34 files changed, 296 insertions(+), 290 deletions(-) create mode 100644 core/storage/keygen/keygen.go create mode 100644 core/storage/peers.go create mode 100644 core/storage/session.go rename {pkg => core}/storage/state.go (65%) create mode 100644 core/storage/storage.go delete mode 100644 pkg/storage/peers.go delete mode 100644 pkg/storage/session.go diff --git a/app/chat/export.go b/app/chat/export.go index ed8222e..5916959 100644 --- a/app/chat/export.go +++ b/app/chat/export.go @@ -18,11 +18,10 @@ import ( "github.com/jedib0t/go-pretty/v6/progress" "go.uber.org/multierr" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tmedia" "github.com/iyear/tdl/core/util/tutil" - "github.com/iyear/tdl/pkg/kv" "github.com/iyear/tdl/pkg/prog" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/texpr" ) @@ -54,7 +53,7 @@ type Message struct { // ENUM(time, id, last) type ExportType int -func Export(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ExportOptions) (rerr error) { +func Export(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts ExportOptions) (rerr error) { // only output available fields if opts.Filter == "-" { fg := texpr.NewFieldsGetter(nil) diff --git a/app/chat/ls.go b/app/chat/ls.go index e2e2d39..dafe57a 100644 --- a/app/chat/ls.go +++ b/app/chat/ls.go @@ -18,9 +18,8 @@ import ( "go.uber.org/zap" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/util/tutil" - "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/texpr" ) @@ -56,7 +55,7 @@ type ListOptions struct { Filter string } -func List(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ListOptions) error { +func List(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts ListOptions) error { log := logctx.From(ctx) // align output diff --git a/app/chat/users.go b/app/chat/users.go index 9146cf3..17abbcd 100644 --- a/app/chat/users.go +++ b/app/chat/users.go @@ -18,10 +18,9 @@ import ( "github.com/jedib0t/go-pretty/v6/progress" "go.uber.org/multierr" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/util/tutil" - "github.com/iyear/tdl/pkg/kv" "github.com/iyear/tdl/pkg/prog" - "github.com/iyear/tdl/pkg/storage" ) type UsersOptions struct { @@ -38,7 +37,7 @@ type User struct { LastName string `json:"last_name"` } -func Users(ctx context.Context, c *telegram.Client, kvd kv.KV, opts UsersOptions) (rerr error) { +func Users(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts UsersOptions) (rerr error) { manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API()) if opts.Chat == "" { return fmt.Errorf("missing domain id") diff --git a/app/dl/dl.go b/app/dl/dl.go index 7aa5ab4..7d31dde 100644 --- a/app/dl/dl.go +++ b/app/dl/dl.go @@ -17,12 +17,11 @@ import ( "github.com/iyear/tdl/core/dcpool" "github.com/iyear/tdl/core/downloader" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/pkg/consts" "github.com/iyear/tdl/pkg/key" - "github.com/iyear/tdl/pkg/kv" "github.com/iyear/tdl/pkg/prog" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/tmessage" "github.com/iyear/tdl/pkg/utils" ) @@ -53,7 +52,7 @@ type parser struct { Parser tmessage.ParseSource } -func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) { +func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) { pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), tclient.NewDefaultMiddlewares(ctx, viper.GetDuration(consts.FlagReconnectTimeout))...) @@ -94,7 +93,7 @@ func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr if rerr != nil { // download is interrupted multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it)) } else { // if finished, we should clear resume key - multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint()))) + multierr.AppendInto(&rerr, kvd.Delete(ctx, key.Resume(it.Fingerprint()))) } }() @@ -137,12 +136,12 @@ func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) { return dialogs, nil } -func resume(ctx context.Context, kvd kv.KV, iter *iter, ask bool) error { +func resume(ctx context.Context, kvd storage.Storage, iter *iter, ask bool) error { logctx.From(ctx).Debug("Check resume key", zap.String("fingerprint", iter.Fingerprint())) - b, err := kvd.Get(key.Resume(iter.Fingerprint())) - if err != nil && !errors.Is(err, kv.ErrNotFound) { + b, err := kvd.Get(ctx, key.Resume(iter.Fingerprint())) + if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } if len(b) == 0 { // no progress @@ -177,14 +176,14 @@ func resume(ctx context.Context, kvd kv.KV, iter *iter, ask bool) error { if !confirm { // clear resume key - return kvd.Delete(key.Resume(iter.Fingerprint())) + return kvd.Delete(ctx, key.Resume(iter.Fingerprint())) } iter.SetFinished(finished) return nil } -func saveProgress(ctx context.Context, kvd kv.KV, it *iter) error { +func saveProgress(ctx context.Context, kvd storage.Storage, it *iter) error { finished := it.Finished() logctx.From(ctx).Debug("Save progress", zap.Int("finished", len(finished))) @@ -193,5 +192,5 @@ func saveProgress(ctx context.Context, kvd kv.KV, it *iter) error { if err != nil { return err } - return kvd.Set(key.Resume(it.Fingerprint()), b) + return kvd.Set(ctx, key.Resume(it.Fingerprint()), b) } diff --git a/app/dl/serve.go b/app/dl/serve.go index f4b6459..6575096 100644 --- a/app/dl/serve.go +++ b/app/dl/serve.go @@ -22,11 +22,10 @@ import ( "github.com/iyear/tdl/core/dcpool" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tmedia" "github.com/iyear/tdl/core/util/tutil" "github.com/iyear/tdl/pkg/consts" - "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/tmessage" ) @@ -39,7 +38,7 @@ type media struct { var tmpl string func serve(ctx context.Context, - kvd kv.KV, + kvd storage.Storage, pool dcpool.Pool, dialogs [][]*tmessage.Dialog, port int, diff --git a/app/forward/forward.go b/app/forward/forward.go index 0b6a357..e46d5e4 100644 --- a/app/forward/forward.go +++ b/app/forward/forward.go @@ -19,12 +19,11 @@ import ( "github.com/iyear/tdl/app/internal/tctx" "github.com/iyear/tdl/core/dcpool" "github.com/iyear/tdl/core/forwarder" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/core/util/tutil" "github.com/iyear/tdl/pkg/consts" - "github.com/iyear/tdl/pkg/kv" "github.com/iyear/tdl/pkg/prog" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/texpr" "github.com/iyear/tdl/pkg/tmessage" ) @@ -40,7 +39,7 @@ type Options struct { Desc bool } -func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) { +func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) { if opts.To == "-" || opts.Edit == "-" { fg := texpr.NewFieldsGetter(nil) diff --git a/app/internal/tctx/tctx.go b/app/internal/tctx/tctx.go index 979230b..af38d09 100644 --- a/app/internal/tctx/tctx.go +++ b/app/internal/tctx/tctx.go @@ -4,16 +4,16 @@ import ( "context" "github.com/iyear/tdl/core/dcpool" - "github.com/iyear/tdl/pkg/kv" + "github.com/iyear/tdl/core/storage" ) type kvKey struct{} -func KV(ctx context.Context) kv.KV { - return ctx.Value(kvKey{}).(kv.KV) +func KV(ctx context.Context) storage.Storage { + return ctx.Value(kvKey{}).(storage.Storage) } -func WithKV(ctx context.Context, kv kv.KV) context.Context { +func WithKV(ctx context.Context, kv storage.Storage) context.Context { return context.WithValue(ctx, kvKey{}, kv) } diff --git a/app/login/code.go b/app/login/code.go index 3854a12..3905eba 100644 --- a/app/login/code.go +++ b/app/login/code.go @@ -23,7 +23,7 @@ func Code(ctx context.Context) error { return errors.Wrap(err, "open kv") } - if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil { + if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil { return errors.Wrap(err, "set app") } diff --git a/app/login/desktop.go b/app/login/desktop.go index ec371f4..ed1620f 100644 --- a/app/login/desktop.go +++ b/app/login/desktop.go @@ -14,11 +14,11 @@ import ( tdtdesktop "github.com/gotd/td/session/tdesktop" "github.com/spf13/viper" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/util/fsutil" "github.com/iyear/tdl/pkg/consts" "github.com/iyear/tdl/pkg/key" "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/tclient" "github.com/iyear/tdl/pkg/tdesktop" "github.com/iyear/tdl/pkg/tpath" @@ -74,7 +74,7 @@ func Desktop(ctx context.Context, opts Options) error { return err } - if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil { + if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil { return err } diff --git a/app/login/qr.go b/app/login/qr.go index 8bb4603..4f2d5da 100644 --- a/app/login/qr.go +++ b/app/login/qr.go @@ -27,7 +27,7 @@ func QR(ctx context.Context) error { return errors.Wrap(err, "open kv") } - if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil { + if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil { return errors.Wrap(err, "set app") } diff --git a/app/up/up.go b/app/up/up.go index b1ad2fe..bea93ff 100644 --- a/app/up/up.go +++ b/app/up/up.go @@ -12,13 +12,12 @@ import ( "go.uber.org/multierr" "github.com/iyear/tdl/core/dcpool" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/core/uploader" "github.com/iyear/tdl/core/util/tutil" "github.com/iyear/tdl/pkg/consts" - "github.com/iyear/tdl/pkg/kv" "github.com/iyear/tdl/pkg/prog" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/utils" ) @@ -30,7 +29,7 @@ type Options struct { Photo bool } -func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) { +func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) { files, err := walk(opts.Paths, opts.Excludes) if err != nil { return err diff --git a/cmd/chat.go b/cmd/chat.go index 2cb7e8f..8c8abb3 100644 --- a/cmd/chat.go +++ b/cmd/chat.go @@ -14,7 +14,7 @@ import ( "github.com/iyear/tdl/app/chat" "github.com/iyear/tdl/core/logctx" - "github.com/iyear/tdl/pkg/kv" + "github.com/iyear/tdl/core/storage" ) var limiter = ratelimit.New(rate.Every(500*time.Millisecond), 2) @@ -38,7 +38,7 @@ func NewChatList() *cobra.Command { Use: "ls", Short: "List your chats", RunE: func(cmd *cobra.Command, args []string) error { - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return chat.List(logctx.Named(ctx, "ls"), c, kvd, opts) }, limiter) }, @@ -83,7 +83,7 @@ func NewChatExport() *cobra.Command { return fmt.Errorf("unknown export type: %s", opts.Type) } - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return chat.Export(logctx.Named(ctx, "export"), c, kvd, opts) }, limiter) }, @@ -138,7 +138,7 @@ func NewChatUsers() *cobra.Command { Use: "users", Short: "export users from (protected) channels", RunE: func(cmd *cobra.Command, args []string) error { - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return chat.Users(logctx.Named(ctx, "users"), c, kvd, opts) }, limiter) }, diff --git a/cmd/dl.go b/cmd/dl.go index b42d7d4..6540ee8 100644 --- a/cmd/dl.go +++ b/cmd/dl.go @@ -10,8 +10,8 @@ import ( "github.com/iyear/tdl/app/dl" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/pkg/consts" - "github.com/iyear/tdl/pkg/kv" ) func NewDownload() *cobra.Command { @@ -29,7 +29,7 @@ func NewDownload() *cobra.Command { opts.Template = viper.GetString(consts.FlagDlTemplate) - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return dl.Run(logctx.Named(ctx, "dl"), c, kvd, opts) }) }, diff --git a/cmd/extension.go b/cmd/extension.go index 7e68a33..bbc1811 100644 --- a/cmd/extension.go +++ b/cmd/extension.go @@ -12,10 +12,10 @@ import ( "github.com/spf13/viper" "github.com/iyear/tdl/app/extension" + "github.com/iyear/tdl/core/storage" extbase "github.com/iyear/tdl/extension" "github.com/iyear/tdl/pkg/consts" "github.com/iyear/tdl/pkg/extensions" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/tclient" ) diff --git a/cmd/forward.go b/cmd/forward.go index e7025e7..ae2087d 100644 --- a/cmd/forward.go +++ b/cmd/forward.go @@ -11,7 +11,7 @@ import ( "github.com/iyear/tdl/app/forward" "github.com/iyear/tdl/core/forwarder" "github.com/iyear/tdl/core/logctx" - "github.com/iyear/tdl/pkg/kv" + "github.com/iyear/tdl/core/storage" ) func NewForward() *cobra.Command { @@ -22,7 +22,7 @@ func NewForward() *cobra.Command { Short: "Forward messages with automatic fallback and message routing", GroupID: groupTools.ID, RunE: func(cmd *cobra.Command, args []string) error { - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return forward.Run(logctx.Named(ctx, "forward"), c, kvd, opts) }) }, diff --git a/cmd/root.go b/cmd/root.go index c45c4f4..bd14a63 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,6 +19,7 @@ import ( "golang.org/x/net/proxy" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" tclientcore "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/core/util/fsutil" "github.com/iyear/tdl/core/util/logutil" @@ -91,12 +92,12 @@ func New() *cobra.Command { } } - storage, err := kv.NewWithMap(viper.GetStringMapString(consts.FlagStorage)) + stg, err := kv.NewWithMap(viper.GetStringMapString(consts.FlagStorage)) if err != nil { return errors.Wrap(err, "create kv storage") } - cmd.SetContext(kv.With(cmd.Context(), storage)) + cmd.SetContext(kv.With(cmd.Context(), stg)) // extension manager client proxy var dialer proxy.ContextDialer = proxy.Direct @@ -228,7 +229,7 @@ func tOptions(ctx context.Context) (tclient.Options, error) { return o, nil } -func tRun(ctx context.Context, f func(ctx context.Context, c *telegram.Client, kvd kv.KV) error, middlewares ...telegram.Middleware) error { +func tRun(ctx context.Context, f func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error, middlewares ...telegram.Middleware) error { o, err := tOptions(ctx) if err != nil { return errors.Wrap(err, "build telegram options") diff --git a/cmd/up.go b/cmd/up.go index eb65be8..a3994e7 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -8,7 +8,7 @@ import ( "github.com/iyear/tdl/app/up" "github.com/iyear/tdl/core/logctx" - "github.com/iyear/tdl/pkg/kv" + "github.com/iyear/tdl/core/storage" ) func NewUpload() *cobra.Command { @@ -20,7 +20,7 @@ func NewUpload() *cobra.Command { Short: "Upload anything to Telegram", GroupID: groupTools.ID, RunE: func(cmd *cobra.Command, args []string) error { - return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error { + return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error { return up.Run(logctx.Named(ctx, "up"), c, kvd, opts) }) }, diff --git a/core/storage/keygen/keygen.go b/core/storage/keygen/keygen.go new file mode 100644 index 0000000..1e815b8 --- /dev/null +++ b/core/storage/keygen/keygen.go @@ -0,0 +1,25 @@ +package keygen + +import ( + "bytes" + "strings" + "sync" +) + +var keyPool = sync.Pool{ + New: func() interface{} { + b := &bytes.Buffer{} + b.Grow(16) + return b + }, +} + +func New(indexes ...string) string { + buf := keyPool.Get().(*bytes.Buffer) + buf.WriteString(strings.Join(indexes, ":")) + + t := buf.String() + buf.Reset() + keyPool.Put(buf) + return t +} diff --git a/core/storage/peers.go b/core/storage/peers.go new file mode 100644 index 0000000..6630158 --- /dev/null +++ b/core/storage/peers.go @@ -0,0 +1,105 @@ +package storage + +import ( + "context" + "encoding/json" + "errors" + "strconv" + + "github.com/gotd/td/telegram/peers" + + "github.com/iyear/tdl/core/storage/keygen" +) + +type Peers struct { + kv Storage +} + +func NewPeers(kv Storage) peers.Storage { + return &Peers{kv: kv} +} + +func (p *Peers) Save(ctx context.Context, key peers.Key, value peers.Value) error { + bytes, err := json.Marshal(value) + if err != nil { + return err + } + + return p.kv.Set(ctx, p.key(key), bytes) +} + +func (p *Peers) Find(ctx context.Context, key peers.Key) (peers.Value, bool, error) { + data, err := p.kv.Get(ctx, p.key(key)) + if err != nil { + if errors.Is(err, ErrNotFound) { + return peers.Value{}, false, nil + } + return peers.Value{}, false, err + } + + var value peers.Value + if err = json.Unmarshal(data, &value); err != nil { + return peers.Value{}, false, err + } + + return value, true, nil +} + +func (p *Peers) SavePhone(ctx context.Context, phone string, _key peers.Key) error { + bytes, err := json.Marshal(_key) + if err != nil { + return err + } + + return p.kv.Set(ctx, p.phoneKey(phone), bytes) +} + +func (p *Peers) FindPhone(ctx context.Context, phone string) (peers.Key, peers.Value, bool, error) { + data, err := p.kv.Get(ctx, p.phoneKey(phone)) + if err != nil { + if errors.Is(err, ErrNotFound) { + return peers.Key{}, peers.Value{}, false, nil + } + return peers.Key{}, peers.Value{}, false, err + } + + var _key peers.Key + if err = json.Unmarshal(data, &_key); err != nil { + return peers.Key{}, peers.Value{}, false, err + } + + value, found, err := p.Find(ctx, _key) + if err != nil { + return peers.Key{}, peers.Value{}, false, err + } + + return _key, value, found, nil +} + +func (p *Peers) GetContactsHash(ctx context.Context) (int64, error) { + data, err := p.kv.Get(ctx, p.contactsKey()) + if err != nil { + if errors.Is(err, ErrNotFound) { + return 0, nil + } + return 0, err + } + + return strconv.ParseInt(string(data), 10, 64) +} + +func (p *Peers) SaveContactsHash(ctx context.Context, hash int64) error { + return p.kv.Set(ctx, p.contactsKey(), []byte(strconv.FormatInt(hash, 10))) +} + +func (p *Peers) key(key peers.Key) string { + return keygen.New("peers", "key", key.Prefix, strconv.FormatInt(key.ID, 10)) +} + +func (p *Peers) phoneKey(phone string) string { + return keygen.New("peers", "phone", phone) +} + +func (p *Peers) contactsKey() string { + return keygen.New("peers", "contacts", "hash") +} diff --git a/core/storage/session.go b/core/storage/session.go new file mode 100644 index 0000000..5d6384c --- /dev/null +++ b/core/storage/session.go @@ -0,0 +1,42 @@ +package storage + +import ( + "context" + "errors" + + "github.com/gotd/td/telegram" + + "github.com/iyear/tdl/core/storage/keygen" +) + +type Session struct { + kv Storage + login bool +} + +func NewSession(kv Storage, login bool) telegram.SessionStorage { + return &Session{kv: kv, login: login} +} + +func (s *Session) LoadSession(ctx context.Context) ([]byte, error) { + if s.login { + return nil, nil + } + + b, err := s.kv.Get(ctx, s.key()) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, nil + } + return nil, err + } + return b, nil +} + +func (s *Session) StoreSession(ctx context.Context, data []byte) error { + return s.kv.Set(ctx, s.key(), data) +} + +func (s *Session) key() string { + return keygen.New("session") +} diff --git a/pkg/storage/state.go b/core/storage/state.go similarity index 65% rename from pkg/storage/state.go rename to core/storage/state.go index 0e837a7..37e816e 100644 --- a/pkg/storage/state.go +++ b/core/storage/state.go @@ -4,23 +4,23 @@ import ( "context" "encoding/json" "errors" + "strconv" "github.com/gotd/td/telegram/updates" - "github.com/iyear/tdl/pkg/key" - "github.com/iyear/tdl/pkg/kv" + "github.com/iyear/tdl/core/storage/keygen" ) type State struct { - kv kv.KV + kv Storage } -func NewState(kv kv.KV) updates.StateStorage { +func NewState(kv Storage) updates.StateStorage { return &State{kv: kv} } -func (s *State) Get(_ context.Context, key string, v interface{}) error { - data, err := s.kv.Get(key) +func (s *State) Get(ctx context.Context, key string, v interface{}) error { + data, err := s.kv.Get(ctx, key) if err != nil { return err } @@ -28,20 +28,20 @@ func (s *State) Get(_ context.Context, key string, v interface{}) error { return json.Unmarshal(data, v) } -func (s *State) Set(_ context.Context, key string, v interface{}) error { +func (s *State) Set(ctx context.Context, key string, v interface{}) error { data, err := json.Marshal(v) if err != nil { return err } - return s.kv.Set(key, data) + return s.kv.Set(ctx, key, data) } func (s *State) GetState(ctx context.Context, userID int64) (updates.State, bool, error) { state := updates.State{} - if err := s.Get(ctx, key.State(userID), &state); err != nil { - if errors.Is(err, kv.ErrNotFound) { + if err := s.Get(ctx, s.stateKey(userID), &state); err != nil { + if errors.Is(err, ErrNotFound) { return state, false, nil } return state, false, err @@ -51,15 +51,15 @@ func (s *State) GetState(ctx context.Context, userID int64) (updates.State, bool } func (s *State) SetState(ctx context.Context, userID int64, state updates.State) error { - if err := s.Set(ctx, key.State(userID), state); err != nil { + if err := s.Set(ctx, s.stateKey(userID), state); err != nil { return err } - return s.Set(ctx, key.StateChannel(userID), struct{}{}) + return s.Set(ctx, s.channelKey(userID), struct{}{}) } func (s *State) SetPts(ctx context.Context, userID int64, pts int) error { - state, k := updates.State{}, key.State(userID) + state, k := updates.State{}, s.stateKey(userID) if err := s.Get(ctx, k, &state); err != nil { return err @@ -69,7 +69,7 @@ func (s *State) SetPts(ctx context.Context, userID int64, pts int) error { } func (s *State) SetQts(ctx context.Context, userID int64, qts int) error { - state, k := updates.State{}, key.State(userID) + state, k := updates.State{}, s.stateKey(userID) if err := s.Get(ctx, k, &state); err != nil { return err @@ -79,7 +79,7 @@ func (s *State) SetQts(ctx context.Context, userID int64, qts int) error { } func (s *State) SetDate(ctx context.Context, userID int64, date int) error { - state, k := updates.State{}, key.State(userID) + state, k := updates.State{}, s.stateKey(userID) if err := s.Get(ctx, k, &state); err != nil { return err @@ -89,7 +89,7 @@ func (s *State) SetDate(ctx context.Context, userID int64, date int) error { } func (s *State) SetSeq(ctx context.Context, userID int64, seq int) error { - state, k := updates.State{}, key.State(userID) + state, k := updates.State{}, s.stateKey(userID) if err := s.Get(ctx, k, &state); err != nil { return err @@ -99,7 +99,7 @@ func (s *State) SetSeq(ctx context.Context, userID int64, seq int) error { } func (s *State) SetDateSeq(ctx context.Context, userID int64, date, seq int) error { - state, k := updates.State{}, key.State(userID) + state, k := updates.State{}, s.stateKey(userID) if err := s.Get(ctx, k, &state); err != nil { return err @@ -112,8 +112,8 @@ func (s *State) SetDateSeq(ctx context.Context, userID int64, date, seq int) err func (s *State) GetChannelPts(ctx context.Context, userID, channelID int64) (int, bool, error) { c := make(map[int64]int) - if err := s.Get(ctx, key.StateChannel(userID), &c); err != nil { - if errors.Is(err, kv.ErrNotFound) { + if err := s.Get(ctx, s.channelKey(userID), &c); err != nil { + if errors.Is(err, ErrNotFound) { return 0, false, nil } return 0, false, err @@ -128,7 +128,7 @@ func (s *State) GetChannelPts(ctx context.Context, userID, channelID int64) (int } func (s *State) SetChannelPts(ctx context.Context, userID, channelID int64, pts int) error { - c, k := make(map[int64]int), key.StateChannel(userID) + c, k := make(map[int64]int), s.channelKey(userID) if err := s.Get(ctx, k, &c); err != nil { return err @@ -140,7 +140,7 @@ func (s *State) SetChannelPts(ctx context.Context, userID, channelID int64, pts func (s *State) ForEachChannels(ctx context.Context, userID int64, f func(ctx context.Context, channelID int64, pts int) error) error { c := make(map[int64]int) - if err := s.Get(ctx, key.StateChannel(userID), &c); err != nil { + if err := s.Get(ctx, s.channelKey(userID), &c); err != nil { return err } @@ -152,3 +152,11 @@ func (s *State) ForEachChannels(ctx context.Context, userID int64, f func(ctx co return nil } + +func (s *State) stateKey(userID int64) string { + return keygen.New("state", strconv.FormatInt(userID, 10)) +} + +func (s *State) channelKey(userID int64) string { + return keygen.New("chan", strconv.FormatInt(userID, 10)) +} diff --git a/core/storage/storage.go b/core/storage/storage.go new file mode 100644 index 0000000..d7eb91f --- /dev/null +++ b/core/storage/storage.go @@ -0,0 +1,15 @@ +package storage + +import ( + "context" + + "github.com/go-faster/errors" +) + +type Storage interface { + Get(ctx context.Context, key string) ([]byte, error) + Set(ctx context.Context, key string, value []byte) error + Delete(ctx context.Context, key string) error +} + +var ErrNotFound = errors.New("key not found") diff --git a/pkg/key/key.go b/pkg/key/key.go index 72b6400..182dcd8 100644 --- a/pkg/key/key.go +++ b/pkg/key/key.go @@ -1,60 +1,13 @@ package key import ( - "bytes" - "strconv" - "strings" - "sync" - - "github.com/gotd/td/telegram/peers" + "github.com/iyear/tdl/core/storage/keygen" ) -var keyPool = sync.Pool{ - New: func() interface{} { - b := &bytes.Buffer{} - b.Grow(16) - return b - }, -} - -func New(indexes ...string) string { - buf := keyPool.Get().(*bytes.Buffer) - buf.WriteString(strings.Join(indexes, ":")) - - t := buf.String() - buf.Reset() - keyPool.Put(buf) - return t -} - -func Session() string { - return New("session") -} - func App() string { - return New("app") -} - -func State(userID int64) string { - return New("state", strconv.FormatInt(userID, 10)) -} - -func StateChannel(userID int64) string { - return New("chan", strconv.FormatInt(userID, 10)) -} - -func PeersKey(key peers.Key) string { - return New("peers", "key", key.Prefix, strconv.FormatInt(key.ID, 10)) -} - -func PeersPhone(phone string) string { - return New("peers", "phone", phone) -} - -func PeersContactsHash() string { - return New("peers", "contacts", "hash") + return keygen.New("app") } func Resume(fingerprint string) string { - return New("resume", fingerprint) + return keygen.New("resume", fingerprint) } diff --git a/pkg/kv/bolt.go b/pkg/kv/bolt.go index d248cf8..bc6dd04 100644 --- a/pkg/kv/bolt.go +++ b/pkg/kv/bolt.go @@ -10,6 +10,7 @@ import ( "go.etcd.io/bbolt" "go.uber.org/multierr" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/pkg/validator" ) @@ -128,7 +129,7 @@ func (b *bolt) walk(fn func(path string) error) error { }) } -func (b *bolt) Open(ns string) (KV, error) { +func (b *bolt) Open(ns string) (storage.Storage, error) { return b.open(ns) } diff --git a/pkg/kv/file.go b/pkg/kv/file.go index b9dac4c..2720b29 100644 --- a/pkg/kv/file.go +++ b/pkg/kv/file.go @@ -1,6 +1,7 @@ package kv import ( + "context" "encoding/json" "os" "path/filepath" @@ -9,6 +10,7 @@ import ( "github.com/go-faster/errors" "github.com/mitchellh/mapstructure" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/pkg/validator" ) @@ -84,7 +86,7 @@ func (f *file) Namespaces() ([]string, error) { return namespaces, nil } -func (f *file) Open(ns string) (KV, error) { +func (f *file) Open(ns string) (storage.Storage, error) { if ns == "" { return nil, errors.New("namespace is required") } @@ -142,7 +144,7 @@ type fileKV struct { ns string } -func (f *fileKV) Get(key string) ([]byte, error) { +func (f *fileKV) Get(_ context.Context, key string) ([]byte, error) { m, err := f.f.read() if err != nil { return nil, errors.Wrap(err, "read") @@ -151,10 +153,10 @@ func (f *fileKV) Get(key string) ([]byte, error) { if v, ok := m[f.ns][key]; ok { return v, nil } - return nil, ErrNotFound + return nil, storage.ErrNotFound } -func (f *fileKV) Set(key string, value []byte) error { +func (f *fileKV) Set(_ context.Context, key string, value []byte) error { m, err := f.f.read() if err != nil { return errors.Wrap(err, "read") @@ -165,7 +167,7 @@ func (f *fileKV) Set(key string, value []byte) error { return f.f.write(m) } -func (f *fileKV) Delete(key string) error { +func (f *fileKV) Delete(_ context.Context, key string) error { m, err := f.f.read() if err != nil { return errors.Wrap(err, "read") diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 4b40366..ebc0ef8 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -5,6 +5,8 @@ import ( "io" "github.com/go-faster/errors" + + "github.com/iyear/tdl/core/storage" ) //go:generate go-enum --values --names --flag --nocase @@ -15,8 +17,6 @@ type Driver string const DriverTypeKey = "type" -var ErrNotFound = errors.New("key not found") - type Meta map[string]map[string][]byte // namespace, key, value type Storage interface { @@ -24,16 +24,10 @@ type Storage interface { MigrateTo() (Meta, error) MigrateFrom(Meta) error Namespaces() ([]string, error) - Open(ns string) (KV, error) + Open(ns string) (storage.Storage, error) io.Closer } -type KV interface { - Get(key string) ([]byte, error) - Set(key string, value []byte) error - Delete(key string) error -} - var drivers = map[Driver]func(map[string]any) (Storage, error){} func register(name Driver, fn func(map[string]any) (Storage, error)) { diff --git a/pkg/kv/kv_test.go b/pkg/kv/kv_test.go index c3779f8..bae14a1 100644 --- a/pkg/kv/kv_test.go +++ b/pkg/kv/kv_test.go @@ -1,6 +1,7 @@ package kv import ( + "context" "fmt" "path/filepath" "testing" @@ -113,7 +114,7 @@ func TestStorage_MigrateTo(t *testing.T) { require.NotNil(t, kv) for key, value := range pairs { - require.NoError(t, kv.Set(key, value)) + require.NoError(t, kv.Set(context.TODO(), key, value)) } } @@ -146,7 +147,7 @@ func TestStorage_MigrateFrom(t *testing.T) { require.NotNil(t, kv) for key, value := range pairs { - v, err := kv.Get(key) + v, err := kv.Get(context.TODO(), key) require.NoError(t, err) require.Equal(t, value, v) } diff --git a/pkg/kv/legacy.go b/pkg/kv/legacy.go index 537071d..960e2e3 100644 --- a/pkg/kv/legacy.go +++ b/pkg/kv/legacy.go @@ -1,6 +1,7 @@ package kv import ( + "context" "os" "time" @@ -8,6 +9,7 @@ import ( "github.com/mitchellh/mapstructure" "go.etcd.io/bbolt" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/pkg/validator" ) @@ -102,7 +104,7 @@ func (l *legacy) Namespaces() ([]string, error) { return namespaces, nil } -func (l *legacy) Open(ns string) (KV, error) { +func (l *legacy) Open(ns string) (storage.Storage, error) { return l.open(ns) } @@ -129,7 +131,7 @@ type legacyKV struct { ns []byte } -func (l *legacyKV) Get(key string) ([]byte, error) { +func (l *legacyKV) Get(_ context.Context, key string) ([]byte, error) { var val []byte if err := l.db.View(func(tx *bbolt.Tx) error { @@ -140,18 +142,18 @@ func (l *legacyKV) Get(key string) ([]byte, error) { } if val == nil { - return nil, ErrNotFound + return nil, storage.ErrNotFound } return val, nil } -func (l *legacyKV) Set(key string, value []byte) error { +func (l *legacyKV) Set(_ context.Context, key string, value []byte) error { return l.db.Update(func(tx *bbolt.Tx) error { return tx.Bucket(l.ns).Put([]byte(key), value) }) } -func (l *legacyKV) Delete(key string) error { +func (l *legacyKV) Delete(_ context.Context, key string) error { return l.db.Update(func(tx *bbolt.Tx) error { return tx.Bucket(l.ns).Delete([]byte(key)) }) diff --git a/pkg/storage/peers.go b/pkg/storage/peers.go deleted file mode 100644 index 752231f..0000000 --- a/pkg/storage/peers.go +++ /dev/null @@ -1,94 +0,0 @@ -package storage - -import ( - "context" - "encoding/json" - "errors" - "strconv" - - "github.com/gotd/td/telegram/peers" - - "github.com/iyear/tdl/pkg/key" - "github.com/iyear/tdl/pkg/kv" -) - -type Peers struct { - kv kv.KV -} - -func NewPeers(kv kv.KV) peers.Storage { - return &Peers{kv: kv} -} - -func (p *Peers) Save(_ context.Context, _key peers.Key, value peers.Value) error { - bytes, err := json.Marshal(value) - if err != nil { - return err - } - - return p.kv.Set(key.PeersKey(_key), bytes) -} - -func (p *Peers) Find(_ context.Context, _key peers.Key) (peers.Value, bool, error) { - data, err := p.kv.Get(key.PeersKey(_key)) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return peers.Value{}, false, nil - } - return peers.Value{}, false, err - } - - var value peers.Value - if err = json.Unmarshal(data, &value); err != nil { - return peers.Value{}, false, err - } - - return value, true, nil -} - -func (p *Peers) SavePhone(_ context.Context, phone string, _key peers.Key) error { - bytes, err := json.Marshal(_key) - if err != nil { - return err - } - - return p.kv.Set(key.PeersPhone(phone), bytes) -} - -func (p *Peers) FindPhone(ctx context.Context, phone string) (peers.Key, peers.Value, bool, error) { - data, err := p.kv.Get(key.PeersPhone(phone)) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return peers.Key{}, peers.Value{}, false, nil - } - return peers.Key{}, peers.Value{}, false, err - } - - var _key peers.Key - if err = json.Unmarshal(data, &_key); err != nil { - return peers.Key{}, peers.Value{}, false, err - } - - value, found, err := p.Find(ctx, _key) - if err != nil { - return peers.Key{}, peers.Value{}, false, err - } - - return _key, value, found, nil -} - -func (p *Peers) GetContactsHash(_ context.Context) (int64, error) { - data, err := p.kv.Get(key.PeersContactsHash()) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return 0, nil - } - return 0, err - } - - return strconv.ParseInt(string(data), 10, 64) -} - -func (p *Peers) SaveContactsHash(_ context.Context, hash int64) error { - return p.kv.Set(key.PeersContactsHash(), []byte(strconv.FormatInt(hash, 10))) -} diff --git a/pkg/storage/session.go b/pkg/storage/session.go deleted file mode 100644 index 1ae08f0..0000000 --- a/pkg/storage/session.go +++ /dev/null @@ -1,39 +0,0 @@ -package storage - -import ( - "context" - "errors" - - "github.com/gotd/td/telegram" - - "github.com/iyear/tdl/pkg/key" - "github.com/iyear/tdl/pkg/kv" -) - -type Session struct { - kv kv.KV - login bool -} - -func NewSession(kv kv.KV, login bool) telegram.SessionStorage { - return &Session{kv: kv, login: login} -} - -func (s *Session) LoadSession(_ context.Context) ([]byte, error) { - if s.login { - return nil, nil - } - - b, err := s.kv.Get(key.Session()) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return nil, nil - } - return nil, err - } - return b, nil -} - -func (s *Session) StoreSession(_ context.Context, data []byte) error { - return s.kv.Set(key.Session(), data) -} diff --git a/pkg/tclient/tclient.go b/pkg/tclient/tclient.go index aa2cc07..73c7722 100644 --- a/pkg/tclient/tclient.go +++ b/pkg/tclient/tclient.go @@ -8,22 +8,21 @@ import ( "github.com/go-faster/errors" "github.com/gotd/td/telegram" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/pkg/key" - "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" ) type Options struct { - KV kv.KV + KV storage.Storage Proxy string NTP string ReconnectTimeout time.Duration UpdateHandler telegram.UpdateHandler } -func GetApp(kv kv.KV) (App, error) { - mode, err := kv.Get(key.App()) +func GetApp(kv storage.Storage) (App, error) { + mode, err := kv.Get(context.TODO(), key.App()) if err != nil { mode = []byte(AppBuiltin) } diff --git a/pkg/tmessage/files.go b/pkg/tmessage/files.go index d8bef22..c7ea36d 100644 --- a/pkg/tmessage/files.go +++ b/pkg/tmessage/files.go @@ -15,9 +15,8 @@ import ( "github.com/iyear/tdl/core/dcpool" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/util/tutil" - "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" ) const ( @@ -36,7 +35,7 @@ type fMessage struct { Text interface{} `mapstructure:"text"` } -func FromFile(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string, onlyMedia bool) ParseSource { +func FromFile(ctx context.Context, pool dcpool.Pool, kvd storage.Storage, files []string, onlyMedia bool) ParseSource { return func() ([]*Dialog, error) { dialogs := make([]*Dialog, 0, len(files)) @@ -56,7 +55,7 @@ func FromFile(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string, } } -func parseFile(ctx context.Context, client *tg.Client, kvd kv.KV, file string, onlyMedia bool) (*Dialog, error) { +func parseFile(ctx context.Context, client *tg.Client, kvd storage.Storage, file string, onlyMedia bool) (*Dialog, error) { f, err := os.Open(file) if err != nil { return nil, err @@ -118,7 +117,7 @@ func collect(ctx context.Context, r io.Reader, peer peers.Peer, onlyMedia bool) return m, nil } -func getChatInfo(ctx context.Context, client *tg.Client, kvd kv.KV, r io.Reader) (peers.Peer, error) { +func getChatInfo(ctx context.Context, client *tg.Client, kvd storage.Storage, r io.Reader) (peers.Peer, error) { d := jstream.NewDecoder(r, 1).EmitKV() chatID := int64(0) diff --git a/pkg/tmessage/urls.go b/pkg/tmessage/urls.go index 1fc5877..9b21b78 100644 --- a/pkg/tmessage/urls.go +++ b/pkg/tmessage/urls.go @@ -8,12 +8,11 @@ import ( "github.com/iyear/tdl/core/dcpool" "github.com/iyear/tdl/core/logctx" + "github.com/iyear/tdl/core/storage" "github.com/iyear/tdl/core/util/tutil" - "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" ) -func FromURL(ctx context.Context, pool dcpool.Pool, kvd kv.KV, urls []string) ParseSource { +func FromURL(ctx context.Context, pool dcpool.Pool, kvd storage.Storage, urls []string) ParseSource { return func() ([]*Dialog, error) { manager := peers.Options{Storage: storage.NewPeers(kvd)}. Build(pool.Default(ctx)) diff --git a/test/testserver/testserver.go b/test/testserver/testserver.go index 061e727..7dc3a04 100644 --- a/test/testserver/testserver.go +++ b/test/testserver/testserver.go @@ -18,9 +18,9 @@ import ( "github.com/gotd/td/tg" "github.com/iyear/tdl/core/dcpool" + "github.com/iyear/tdl/core/storage" tclientcore "github.com/iyear/tdl/core/tclient" "github.com/iyear/tdl/pkg/kv" - "github.com/iyear/tdl/pkg/storage" "github.com/iyear/tdl/pkg/tclient" )