refactor(core): move storage package to core module (#813)

* refactor(core): move storage package to core module

* rename

* lint
This commit is contained in:
Junyu Liu 2024-11-22 17:41:33 +08:00 committed by GitHub
parent 0d574733f2
commit eee2d1c546
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 296 additions and 290 deletions

View File

@ -18,11 +18,10 @@ import (
"github.com/jedib0t/go-pretty/v6/progress"
"go.uber.org/multierr"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tmedia"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
)
@ -54,7 +53,7 @@ type Message struct {
// ENUM(time, id, last)
type ExportType int
func Export(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ExportOptions) (rerr error) {
func Export(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts ExportOptions) (rerr error) {
// only output available fields
if opts.Filter == "-" {
fg := texpr.NewFieldsGetter(nil)

View File

@ -18,9 +18,8 @@ import (
"go.uber.org/zap"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
)
@ -56,7 +55,7 @@ type ListOptions struct {
Filter string
}
func List(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ListOptions) error {
func List(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts ListOptions) error {
log := logctx.From(ctx)
// align output

View File

@ -18,10 +18,9 @@ import (
"github.com/jedib0t/go-pretty/v6/progress"
"go.uber.org/multierr"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
)
type UsersOptions struct {
@ -38,7 +37,7 @@ type User struct {
LastName string `json:"last_name"`
}
func Users(ctx context.Context, c *telegram.Client, kvd kv.KV, opts UsersOptions) (rerr error) {
func Users(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts UsersOptions) (rerr error) {
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
if opts.Chat == "" {
return fmt.Errorf("missing domain id")

View File

@ -17,12 +17,11 @@ import (
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/downloader"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)
@ -53,7 +52,7 @@ type parser struct {
Parser tmessage.ParseSource
}
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) {
pool := dcpool.NewPool(c,
int64(viper.GetInt(consts.FlagPoolSize)),
tclient.NewDefaultMiddlewares(ctx, viper.GetDuration(consts.FlagReconnectTimeout))...)
@ -94,7 +93,7 @@ func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint())))
multierr.AppendInto(&rerr, kvd.Delete(ctx, key.Resume(it.Fingerprint())))
}
}()
@ -137,12 +136,12 @@ func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {
return dialogs, nil
}
func resume(ctx context.Context, kvd kv.KV, iter *iter, ask bool) error {
func resume(ctx context.Context, kvd storage.Storage, iter *iter, ask bool) error {
logctx.From(ctx).Debug("Check resume key",
zap.String("fingerprint", iter.Fingerprint()))
b, err := kvd.Get(key.Resume(iter.Fingerprint()))
if err != nil && !errors.Is(err, kv.ErrNotFound) {
b, err := kvd.Get(ctx, key.Resume(iter.Fingerprint()))
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}
if len(b) == 0 { // no progress
@ -177,14 +176,14 @@ func resume(ctx context.Context, kvd kv.KV, iter *iter, ask bool) error {
if !confirm {
// clear resume key
return kvd.Delete(key.Resume(iter.Fingerprint()))
return kvd.Delete(ctx, key.Resume(iter.Fingerprint()))
}
iter.SetFinished(finished)
return nil
}
func saveProgress(ctx context.Context, kvd kv.KV, it *iter) error {
func saveProgress(ctx context.Context, kvd storage.Storage, it *iter) error {
finished := it.Finished()
logctx.From(ctx).Debug("Save progress",
zap.Int("finished", len(finished)))
@ -193,5 +192,5 @@ func saveProgress(ctx context.Context, kvd kv.KV, it *iter) error {
if err != nil {
return err
}
return kvd.Set(key.Resume(it.Fingerprint()), b)
return kvd.Set(ctx, key.Resume(it.Fingerprint()), b)
}

View File

@ -22,11 +22,10 @@ import (
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tmedia"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tmessage"
)
@ -39,7 +38,7 @@ type media struct {
var tmpl string
func serve(ctx context.Context,
kvd kv.KV,
kvd storage.Storage,
pool dcpool.Pool,
dialogs [][]*tmessage.Dialog,
port int,

View File

@ -19,12 +19,11 @@ import (
"github.com/iyear/tdl/app/internal/tctx"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/forwarder"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
"github.com/iyear/tdl/pkg/tmessage"
)
@ -40,7 +39,7 @@ type Options struct {
Desc bool
}
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) {
if opts.To == "-" || opts.Edit == "-" {
fg := texpr.NewFieldsGetter(nil)

View File

@ -4,16 +4,16 @@ import (
"context"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/core/storage"
)
type kvKey struct{}
func KV(ctx context.Context) kv.KV {
return ctx.Value(kvKey{}).(kv.KV)
func KV(ctx context.Context) storage.Storage {
return ctx.Value(kvKey{}).(storage.Storage)
}
func WithKV(ctx context.Context, kv kv.KV) context.Context {
func WithKV(ctx context.Context, kv storage.Storage) context.Context {
return context.WithValue(ctx, kvKey{}, kv)
}

View File

@ -23,7 +23,7 @@ func Code(ctx context.Context) error {
return errors.Wrap(err, "open kv")
}
if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil {
if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil {
return errors.Wrap(err, "set app")
}

View File

@ -14,11 +14,11 @@ import (
tdtdesktop "github.com/gotd/td/session/tdesktop"
"github.com/spf13/viper"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/util/fsutil"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tclient"
"github.com/iyear/tdl/pkg/tdesktop"
"github.com/iyear/tdl/pkg/tpath"
@ -74,7 +74,7 @@ func Desktop(ctx context.Context, opts Options) error {
return err
}
if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil {
if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil {
return err
}

View File

@ -27,7 +27,7 @@ func QR(ctx context.Context) error {
return errors.Wrap(err, "open kv")
}
if err = kvd.Set(key.App(), []byte(tclient.AppDesktop)); err != nil {
if err = kvd.Set(ctx, key.App(), []byte(tclient.AppDesktop)); err != nil {
return errors.Wrap(err, "set app")
}

View File

@ -12,13 +12,12 @@ import (
"go.uber.org/multierr"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/core/uploader"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/utils"
)
@ -30,7 +29,7 @@ type Options struct {
Photo bool
}
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) {
files, err := walk(opts.Paths, opts.Excludes)
if err != nil {
return err

View File

@ -14,7 +14,7 @@ import (
"github.com/iyear/tdl/app/chat"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/core/storage"
)
var limiter = ratelimit.New(rate.Every(500*time.Millisecond), 2)
@ -38,7 +38,7 @@ func NewChatList() *cobra.Command {
Use: "ls",
Short: "List your chats",
RunE: func(cmd *cobra.Command, args []string) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return chat.List(logctx.Named(ctx, "ls"), c, kvd, opts)
}, limiter)
},
@ -83,7 +83,7 @@ func NewChatExport() *cobra.Command {
return fmt.Errorf("unknown export type: %s", opts.Type)
}
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return chat.Export(logctx.Named(ctx, "export"), c, kvd, opts)
}, limiter)
},
@ -138,7 +138,7 @@ func NewChatUsers() *cobra.Command {
Use: "users",
Short: "export users from (protected) channels",
RunE: func(cmd *cobra.Command, args []string) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return chat.Users(logctx.Named(ctx, "users"), c, kvd, opts)
}, limiter)
},

View File

@ -10,8 +10,8 @@ import (
"github.com/iyear/tdl/app/dl"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
)
func NewDownload() *cobra.Command {
@ -29,7 +29,7 @@ func NewDownload() *cobra.Command {
opts.Template = viper.GetString(consts.FlagDlTemplate)
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return dl.Run(logctx.Named(ctx, "dl"), c, kvd, opts)
})
},

View File

@ -12,10 +12,10 @@ import (
"github.com/spf13/viper"
"github.com/iyear/tdl/app/extension"
"github.com/iyear/tdl/core/storage"
extbase "github.com/iyear/tdl/extension"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/extensions"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tclient"
)

View File

@ -11,7 +11,7 @@ import (
"github.com/iyear/tdl/app/forward"
"github.com/iyear/tdl/core/forwarder"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/core/storage"
)
func NewForward() *cobra.Command {
@ -22,7 +22,7 @@ func NewForward() *cobra.Command {
Short: "Forward messages with automatic fallback and message routing",
GroupID: groupTools.ID,
RunE: func(cmd *cobra.Command, args []string) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return forward.Run(logctx.Named(ctx, "forward"), c, kvd, opts)
})
},

View File

@ -19,6 +19,7 @@ import (
"golang.org/x/net/proxy"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
tclientcore "github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/core/util/fsutil"
"github.com/iyear/tdl/core/util/logutil"
@ -91,12 +92,12 @@ func New() *cobra.Command {
}
}
storage, err := kv.NewWithMap(viper.GetStringMapString(consts.FlagStorage))
stg, err := kv.NewWithMap(viper.GetStringMapString(consts.FlagStorage))
if err != nil {
return errors.Wrap(err, "create kv storage")
}
cmd.SetContext(kv.With(cmd.Context(), storage))
cmd.SetContext(kv.With(cmd.Context(), stg))
// extension manager client proxy
var dialer proxy.ContextDialer = proxy.Direct
@ -228,7 +229,7 @@ func tOptions(ctx context.Context) (tclient.Options, error) {
return o, nil
}
func tRun(ctx context.Context, f func(ctx context.Context, c *telegram.Client, kvd kv.KV) error, middlewares ...telegram.Middleware) error {
func tRun(ctx context.Context, f func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error, middlewares ...telegram.Middleware) error {
o, err := tOptions(ctx)
if err != nil {
return errors.Wrap(err, "build telegram options")

View File

@ -8,7 +8,7 @@ import (
"github.com/iyear/tdl/app/up"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/core/storage"
)
func NewUpload() *cobra.Command {
@ -20,7 +20,7 @@ func NewUpload() *cobra.Command {
Short: "Upload anything to Telegram",
GroupID: groupTools.ID,
RunE: func(cmd *cobra.Command, args []string) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return tRun(cmd.Context(), func(ctx context.Context, c *telegram.Client, kvd storage.Storage) error {
return up.Run(logctx.Named(ctx, "up"), c, kvd, opts)
})
},

View File

@ -0,0 +1,25 @@
package keygen
import (
"bytes"
"strings"
"sync"
)
var keyPool = sync.Pool{
New: func() interface{} {
b := &bytes.Buffer{}
b.Grow(16)
return b
},
}
func New(indexes ...string) string {
buf := keyPool.Get().(*bytes.Buffer)
buf.WriteString(strings.Join(indexes, ":"))
t := buf.String()
buf.Reset()
keyPool.Put(buf)
return t
}

105
core/storage/peers.go Normal file
View File

@ -0,0 +1,105 @@
package storage
import (
"context"
"encoding/json"
"errors"
"strconv"
"github.com/gotd/td/telegram/peers"
"github.com/iyear/tdl/core/storage/keygen"
)
type Peers struct {
kv Storage
}
func NewPeers(kv Storage) peers.Storage {
return &Peers{kv: kv}
}
func (p *Peers) Save(ctx context.Context, key peers.Key, value peers.Value) error {
bytes, err := json.Marshal(value)
if err != nil {
return err
}
return p.kv.Set(ctx, p.key(key), bytes)
}
func (p *Peers) Find(ctx context.Context, key peers.Key) (peers.Value, bool, error) {
data, err := p.kv.Get(ctx, p.key(key))
if err != nil {
if errors.Is(err, ErrNotFound) {
return peers.Value{}, false, nil
}
return peers.Value{}, false, err
}
var value peers.Value
if err = json.Unmarshal(data, &value); err != nil {
return peers.Value{}, false, err
}
return value, true, nil
}
func (p *Peers) SavePhone(ctx context.Context, phone string, _key peers.Key) error {
bytes, err := json.Marshal(_key)
if err != nil {
return err
}
return p.kv.Set(ctx, p.phoneKey(phone), bytes)
}
func (p *Peers) FindPhone(ctx context.Context, phone string) (peers.Key, peers.Value, bool, error) {
data, err := p.kv.Get(ctx, p.phoneKey(phone))
if err != nil {
if errors.Is(err, ErrNotFound) {
return peers.Key{}, peers.Value{}, false, nil
}
return peers.Key{}, peers.Value{}, false, err
}
var _key peers.Key
if err = json.Unmarshal(data, &_key); err != nil {
return peers.Key{}, peers.Value{}, false, err
}
value, found, err := p.Find(ctx, _key)
if err != nil {
return peers.Key{}, peers.Value{}, false, err
}
return _key, value, found, nil
}
func (p *Peers) GetContactsHash(ctx context.Context) (int64, error) {
data, err := p.kv.Get(ctx, p.contactsKey())
if err != nil {
if errors.Is(err, ErrNotFound) {
return 0, nil
}
return 0, err
}
return strconv.ParseInt(string(data), 10, 64)
}
func (p *Peers) SaveContactsHash(ctx context.Context, hash int64) error {
return p.kv.Set(ctx, p.contactsKey(), []byte(strconv.FormatInt(hash, 10)))
}
func (p *Peers) key(key peers.Key) string {
return keygen.New("peers", "key", key.Prefix, strconv.FormatInt(key.ID, 10))
}
func (p *Peers) phoneKey(phone string) string {
return keygen.New("peers", "phone", phone)
}
func (p *Peers) contactsKey() string {
return keygen.New("peers", "contacts", "hash")
}

42
core/storage/session.go Normal file
View File

@ -0,0 +1,42 @@
package storage
import (
"context"
"errors"
"github.com/gotd/td/telegram"
"github.com/iyear/tdl/core/storage/keygen"
)
type Session struct {
kv Storage
login bool
}
func NewSession(kv Storage, login bool) telegram.SessionStorage {
return &Session{kv: kv, login: login}
}
func (s *Session) LoadSession(ctx context.Context) ([]byte, error) {
if s.login {
return nil, nil
}
b, err := s.kv.Get(ctx, s.key())
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, nil
}
return nil, err
}
return b, nil
}
func (s *Session) StoreSession(ctx context.Context, data []byte) error {
return s.kv.Set(ctx, s.key(), data)
}
func (s *Session) key() string {
return keygen.New("session")
}

View File

@ -4,23 +4,23 @@ import (
"context"
"encoding/json"
"errors"
"strconv"
"github.com/gotd/td/telegram/updates"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/core/storage/keygen"
)
type State struct {
kv kv.KV
kv Storage
}
func NewState(kv kv.KV) updates.StateStorage {
func NewState(kv Storage) updates.StateStorage {
return &State{kv: kv}
}
func (s *State) Get(_ context.Context, key string, v interface{}) error {
data, err := s.kv.Get(key)
func (s *State) Get(ctx context.Context, key string, v interface{}) error {
data, err := s.kv.Get(ctx, key)
if err != nil {
return err
}
@ -28,20 +28,20 @@ func (s *State) Get(_ context.Context, key string, v interface{}) error {
return json.Unmarshal(data, v)
}
func (s *State) Set(_ context.Context, key string, v interface{}) error {
func (s *State) Set(ctx context.Context, key string, v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
return s.kv.Set(key, data)
return s.kv.Set(ctx, key, data)
}
func (s *State) GetState(ctx context.Context, userID int64) (updates.State, bool, error) {
state := updates.State{}
if err := s.Get(ctx, key.State(userID), &state); err != nil {
if errors.Is(err, kv.ErrNotFound) {
if err := s.Get(ctx, s.stateKey(userID), &state); err != nil {
if errors.Is(err, ErrNotFound) {
return state, false, nil
}
return state, false, err
@ -51,15 +51,15 @@ func (s *State) GetState(ctx context.Context, userID int64) (updates.State, bool
}
func (s *State) SetState(ctx context.Context, userID int64, state updates.State) error {
if err := s.Set(ctx, key.State(userID), state); err != nil {
if err := s.Set(ctx, s.stateKey(userID), state); err != nil {
return err
}
return s.Set(ctx, key.StateChannel(userID), struct{}{})
return s.Set(ctx, s.channelKey(userID), struct{}{})
}
func (s *State) SetPts(ctx context.Context, userID int64, pts int) error {
state, k := updates.State{}, key.State(userID)
state, k := updates.State{}, s.stateKey(userID)
if err := s.Get(ctx, k, &state); err != nil {
return err
@ -69,7 +69,7 @@ func (s *State) SetPts(ctx context.Context, userID int64, pts int) error {
}
func (s *State) SetQts(ctx context.Context, userID int64, qts int) error {
state, k := updates.State{}, key.State(userID)
state, k := updates.State{}, s.stateKey(userID)
if err := s.Get(ctx, k, &state); err != nil {
return err
@ -79,7 +79,7 @@ func (s *State) SetQts(ctx context.Context, userID int64, qts int) error {
}
func (s *State) SetDate(ctx context.Context, userID int64, date int) error {
state, k := updates.State{}, key.State(userID)
state, k := updates.State{}, s.stateKey(userID)
if err := s.Get(ctx, k, &state); err != nil {
return err
@ -89,7 +89,7 @@ func (s *State) SetDate(ctx context.Context, userID int64, date int) error {
}
func (s *State) SetSeq(ctx context.Context, userID int64, seq int) error {
state, k := updates.State{}, key.State(userID)
state, k := updates.State{}, s.stateKey(userID)
if err := s.Get(ctx, k, &state); err != nil {
return err
@ -99,7 +99,7 @@ func (s *State) SetSeq(ctx context.Context, userID int64, seq int) error {
}
func (s *State) SetDateSeq(ctx context.Context, userID int64, date, seq int) error {
state, k := updates.State{}, key.State(userID)
state, k := updates.State{}, s.stateKey(userID)
if err := s.Get(ctx, k, &state); err != nil {
return err
@ -112,8 +112,8 @@ func (s *State) SetDateSeq(ctx context.Context, userID int64, date, seq int) err
func (s *State) GetChannelPts(ctx context.Context, userID, channelID int64) (int, bool, error) {
c := make(map[int64]int)
if err := s.Get(ctx, key.StateChannel(userID), &c); err != nil {
if errors.Is(err, kv.ErrNotFound) {
if err := s.Get(ctx, s.channelKey(userID), &c); err != nil {
if errors.Is(err, ErrNotFound) {
return 0, false, nil
}
return 0, false, err
@ -128,7 +128,7 @@ func (s *State) GetChannelPts(ctx context.Context, userID, channelID int64) (int
}
func (s *State) SetChannelPts(ctx context.Context, userID, channelID int64, pts int) error {
c, k := make(map[int64]int), key.StateChannel(userID)
c, k := make(map[int64]int), s.channelKey(userID)
if err := s.Get(ctx, k, &c); err != nil {
return err
@ -140,7 +140,7 @@ func (s *State) SetChannelPts(ctx context.Context, userID, channelID int64, pts
func (s *State) ForEachChannels(ctx context.Context, userID int64, f func(ctx context.Context, channelID int64, pts int) error) error {
c := make(map[int64]int)
if err := s.Get(ctx, key.StateChannel(userID), &c); err != nil {
if err := s.Get(ctx, s.channelKey(userID), &c); err != nil {
return err
}
@ -152,3 +152,11 @@ func (s *State) ForEachChannels(ctx context.Context, userID int64, f func(ctx co
return nil
}
func (s *State) stateKey(userID int64) string {
return keygen.New("state", strconv.FormatInt(userID, 10))
}
func (s *State) channelKey(userID int64) string {
return keygen.New("chan", strconv.FormatInt(userID, 10))
}

15
core/storage/storage.go Normal file
View File

@ -0,0 +1,15 @@
package storage
import (
"context"
"github.com/go-faster/errors"
)
type Storage interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte) error
Delete(ctx context.Context, key string) error
}
var ErrNotFound = errors.New("key not found")

View File

@ -1,60 +1,13 @@
package key
import (
"bytes"
"strconv"
"strings"
"sync"
"github.com/gotd/td/telegram/peers"
"github.com/iyear/tdl/core/storage/keygen"
)
var keyPool = sync.Pool{
New: func() interface{} {
b := &bytes.Buffer{}
b.Grow(16)
return b
},
}
func New(indexes ...string) string {
buf := keyPool.Get().(*bytes.Buffer)
buf.WriteString(strings.Join(indexes, ":"))
t := buf.String()
buf.Reset()
keyPool.Put(buf)
return t
}
func Session() string {
return New("session")
}
func App() string {
return New("app")
}
func State(userID int64) string {
return New("state", strconv.FormatInt(userID, 10))
}
func StateChannel(userID int64) string {
return New("chan", strconv.FormatInt(userID, 10))
}
func PeersKey(key peers.Key) string {
return New("peers", "key", key.Prefix, strconv.FormatInt(key.ID, 10))
}
func PeersPhone(phone string) string {
return New("peers", "phone", phone)
}
func PeersContactsHash() string {
return New("peers", "contacts", "hash")
return keygen.New("app")
}
func Resume(fingerprint string) string {
return New("resume", fingerprint)
return keygen.New("resume", fingerprint)
}

View File

@ -10,6 +10,7 @@ import (
"go.etcd.io/bbolt"
"go.uber.org/multierr"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/pkg/validator"
)
@ -128,7 +129,7 @@ func (b *bolt) walk(fn func(path string) error) error {
})
}
func (b *bolt) Open(ns string) (KV, error) {
func (b *bolt) Open(ns string) (storage.Storage, error) {
return b.open(ns)
}

View File

@ -1,6 +1,7 @@
package kv
import (
"context"
"encoding/json"
"os"
"path/filepath"
@ -9,6 +10,7 @@ import (
"github.com/go-faster/errors"
"github.com/mitchellh/mapstructure"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/pkg/validator"
)
@ -84,7 +86,7 @@ func (f *file) Namespaces() ([]string, error) {
return namespaces, nil
}
func (f *file) Open(ns string) (KV, error) {
func (f *file) Open(ns string) (storage.Storage, error) {
if ns == "" {
return nil, errors.New("namespace is required")
}
@ -142,7 +144,7 @@ type fileKV struct {
ns string
}
func (f *fileKV) Get(key string) ([]byte, error) {
func (f *fileKV) Get(_ context.Context, key string) ([]byte, error) {
m, err := f.f.read()
if err != nil {
return nil, errors.Wrap(err, "read")
@ -151,10 +153,10 @@ func (f *fileKV) Get(key string) ([]byte, error) {
if v, ok := m[f.ns][key]; ok {
return v, nil
}
return nil, ErrNotFound
return nil, storage.ErrNotFound
}
func (f *fileKV) Set(key string, value []byte) error {
func (f *fileKV) Set(_ context.Context, key string, value []byte) error {
m, err := f.f.read()
if err != nil {
return errors.Wrap(err, "read")
@ -165,7 +167,7 @@ func (f *fileKV) Set(key string, value []byte) error {
return f.f.write(m)
}
func (f *fileKV) Delete(key string) error {
func (f *fileKV) Delete(_ context.Context, key string) error {
m, err := f.f.read()
if err != nil {
return errors.Wrap(err, "read")

View File

@ -5,6 +5,8 @@ import (
"io"
"github.com/go-faster/errors"
"github.com/iyear/tdl/core/storage"
)
//go:generate go-enum --values --names --flag --nocase
@ -15,8 +17,6 @@ type Driver string
const DriverTypeKey = "type"
var ErrNotFound = errors.New("key not found")
type Meta map[string]map[string][]byte // namespace, key, value
type Storage interface {
@ -24,16 +24,10 @@ type Storage interface {
MigrateTo() (Meta, error)
MigrateFrom(Meta) error
Namespaces() ([]string, error)
Open(ns string) (KV, error)
Open(ns string) (storage.Storage, error)
io.Closer
}
type KV interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
}
var drivers = map[Driver]func(map[string]any) (Storage, error){}
func register(name Driver, fn func(map[string]any) (Storage, error)) {

View File

@ -1,6 +1,7 @@
package kv
import (
"context"
"fmt"
"path/filepath"
"testing"
@ -113,7 +114,7 @@ func TestStorage_MigrateTo(t *testing.T) {
require.NotNil(t, kv)
for key, value := range pairs {
require.NoError(t, kv.Set(key, value))
require.NoError(t, kv.Set(context.TODO(), key, value))
}
}
@ -146,7 +147,7 @@ func TestStorage_MigrateFrom(t *testing.T) {
require.NotNil(t, kv)
for key, value := range pairs {
v, err := kv.Get(key)
v, err := kv.Get(context.TODO(), key)
require.NoError(t, err)
require.Equal(t, value, v)
}

View File

@ -1,6 +1,7 @@
package kv
import (
"context"
"os"
"time"
@ -8,6 +9,7 @@ import (
"github.com/mitchellh/mapstructure"
"go.etcd.io/bbolt"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/pkg/validator"
)
@ -102,7 +104,7 @@ func (l *legacy) Namespaces() ([]string, error) {
return namespaces, nil
}
func (l *legacy) Open(ns string) (KV, error) {
func (l *legacy) Open(ns string) (storage.Storage, error) {
return l.open(ns)
}
@ -129,7 +131,7 @@ type legacyKV struct {
ns []byte
}
func (l *legacyKV) Get(key string) ([]byte, error) {
func (l *legacyKV) Get(_ context.Context, key string) ([]byte, error) {
var val []byte
if err := l.db.View(func(tx *bbolt.Tx) error {
@ -140,18 +142,18 @@ func (l *legacyKV) Get(key string) ([]byte, error) {
}
if val == nil {
return nil, ErrNotFound
return nil, storage.ErrNotFound
}
return val, nil
}
func (l *legacyKV) Set(key string, value []byte) error {
func (l *legacyKV) Set(_ context.Context, key string, value []byte) error {
return l.db.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(l.ns).Put([]byte(key), value)
})
}
func (l *legacyKV) Delete(key string) error {
func (l *legacyKV) Delete(_ context.Context, key string) error {
return l.db.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(l.ns).Delete([]byte(key))
})

View File

@ -1,94 +0,0 @@
package storage
import (
"context"
"encoding/json"
"errors"
"strconv"
"github.com/gotd/td/telegram/peers"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
)
type Peers struct {
kv kv.KV
}
func NewPeers(kv kv.KV) peers.Storage {
return &Peers{kv: kv}
}
func (p *Peers) Save(_ context.Context, _key peers.Key, value peers.Value) error {
bytes, err := json.Marshal(value)
if err != nil {
return err
}
return p.kv.Set(key.PeersKey(_key), bytes)
}
func (p *Peers) Find(_ context.Context, _key peers.Key) (peers.Value, bool, error) {
data, err := p.kv.Get(key.PeersKey(_key))
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return peers.Value{}, false, nil
}
return peers.Value{}, false, err
}
var value peers.Value
if err = json.Unmarshal(data, &value); err != nil {
return peers.Value{}, false, err
}
return value, true, nil
}
func (p *Peers) SavePhone(_ context.Context, phone string, _key peers.Key) error {
bytes, err := json.Marshal(_key)
if err != nil {
return err
}
return p.kv.Set(key.PeersPhone(phone), bytes)
}
func (p *Peers) FindPhone(ctx context.Context, phone string) (peers.Key, peers.Value, bool, error) {
data, err := p.kv.Get(key.PeersPhone(phone))
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return peers.Key{}, peers.Value{}, false, nil
}
return peers.Key{}, peers.Value{}, false, err
}
var _key peers.Key
if err = json.Unmarshal(data, &_key); err != nil {
return peers.Key{}, peers.Value{}, false, err
}
value, found, err := p.Find(ctx, _key)
if err != nil {
return peers.Key{}, peers.Value{}, false, err
}
return _key, value, found, nil
}
func (p *Peers) GetContactsHash(_ context.Context) (int64, error) {
data, err := p.kv.Get(key.PeersContactsHash())
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return 0, nil
}
return 0, err
}
return strconv.ParseInt(string(data), 10, 64)
}
func (p *Peers) SaveContactsHash(_ context.Context, hash int64) error {
return p.kv.Set(key.PeersContactsHash(), []byte(strconv.FormatInt(hash, 10)))
}

View File

@ -1,39 +0,0 @@
package storage
import (
"context"
"errors"
"github.com/gotd/td/telegram"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
)
type Session struct {
kv kv.KV
login bool
}
func NewSession(kv kv.KV, login bool) telegram.SessionStorage {
return &Session{kv: kv, login: login}
}
func (s *Session) LoadSession(_ context.Context) ([]byte, error) {
if s.login {
return nil, nil
}
b, err := s.kv.Get(key.Session())
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return nil, nil
}
return nil, err
}
return b, nil
}
func (s *Session) StoreSession(_ context.Context, data []byte) error {
return s.kv.Set(key.Session(), data)
}

View File

@ -8,22 +8,21 @@ import (
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
)
type Options struct {
KV kv.KV
KV storage.Storage
Proxy string
NTP string
ReconnectTimeout time.Duration
UpdateHandler telegram.UpdateHandler
}
func GetApp(kv kv.KV) (App, error) {
mode, err := kv.Get(key.App())
func GetApp(kv storage.Storage) (App, error) {
mode, err := kv.Get(context.TODO(), key.App())
if err != nil {
mode = []byte(AppBuiltin)
}

View File

@ -15,9 +15,8 @@ import (
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
)
const (
@ -36,7 +35,7 @@ type fMessage struct {
Text interface{} `mapstructure:"text"`
}
func FromFile(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string, onlyMedia bool) ParseSource {
func FromFile(ctx context.Context, pool dcpool.Pool, kvd storage.Storage, files []string, onlyMedia bool) ParseSource {
return func() ([]*Dialog, error) {
dialogs := make([]*Dialog, 0, len(files))
@ -56,7 +55,7 @@ func FromFile(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string,
}
}
func parseFile(ctx context.Context, client *tg.Client, kvd kv.KV, file string, onlyMedia bool) (*Dialog, error) {
func parseFile(ctx context.Context, client *tg.Client, kvd storage.Storage, file string, onlyMedia bool) (*Dialog, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
@ -118,7 +117,7 @@ func collect(ctx context.Context, r io.Reader, peer peers.Peer, onlyMedia bool)
return m, nil
}
func getChatInfo(ctx context.Context, client *tg.Client, kvd kv.KV, r io.Reader) (peers.Peer, error) {
func getChatInfo(ctx context.Context, client *tg.Client, kvd storage.Storage, r io.Reader) (peers.Peer, error) {
d := jstream.NewDecoder(r, 1).EmitKV()
chatID := int64(0)

View File

@ -8,12 +8,11 @@ import (
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/util/tutil"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
)
func FromURL(ctx context.Context, pool dcpool.Pool, kvd kv.KV, urls []string) ParseSource {
func FromURL(ctx context.Context, pool dcpool.Pool, kvd storage.Storage, urls []string) ParseSource {
return func() ([]*Dialog, error) {
manager := peers.Options{Storage: storage.NewPeers(kvd)}.
Build(pool.Default(ctx))

View File

@ -18,9 +18,9 @@ import (
"github.com/gotd/td/tg"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/storage"
tclientcore "github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tclient"
)