tdl/app/dl/dl.go
Junyu Liu eee2d1c546
refactor(core): move storage package to core module (#813)
* refactor(core): move storage package to core module

* rename

* lint
2024-11-22 17:41:33 +08:00

197 lines
4.8 KiB
Go

package dl
import (
"context"
"encoding/json"
"fmt"
"github.com/AlecAivazis/survey/v2"
"github.com/fatih/color"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
"github.com/spf13/viper"
"go.uber.org/multierr"
"go.uber.org/zap"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/downloader"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)
type Options struct {
Dir string
RewriteExt bool
SkipSame bool
Template string
URLs []string
Files []string
Include []string
Exclude []string
Desc bool
Takeout bool
Group bool // auto detect grouped message
// resume opts
Continue, Restart bool
// serve
Serve bool
Port int
}
type parser struct {
Data []string
Parser tmessage.ParseSource
}
func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) {
pool := dcpool.NewPool(c,
int64(viper.GetInt(consts.FlagPoolSize)),
tclient.NewDefaultMiddlewares(ctx, viper.GetDuration(consts.FlagReconnectTimeout))...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
logctx.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))
if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
it, err := newIter(pool, manager, dialogs, opts, viper.GetDuration(consts.FlagDelay))
if err != nil {
return err
}
if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
return err
}
} else {
color.Yellow("Restart download by 'restart' flag")
}
defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(ctx, key.Resume(it.Fingerprint())))
}
}()
dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)
options := downloader.Options{
Pool: pool,
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
}
limit := viper.GetInt(consts.FlagLimit)
logctx.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))
color.Green("All files will be downloaded to '%s' dir", opts.Dir)
go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)
return downloader.New(options).Download(ctx, limit)
}
func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {
var dialogs [][]*tmessage.Dialog
for _, p := range parsers {
d, err := tmessage.Parse(p.Parser)
if err != nil {
return nil, err
}
dialogs = append(dialogs, d)
}
return dialogs, nil
}
func resume(ctx context.Context, kvd storage.Storage, iter *iter, ask bool) error {
logctx.From(ctx).Debug("Check resume key",
zap.String("fingerprint", iter.Fingerprint()))
b, err := kvd.Get(ctx, key.Resume(iter.Fingerprint()))
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}
if len(b) == 0 { // no progress
return nil
}
finished := make(map[int]struct{})
if err = json.Unmarshal(b, &finished); err != nil {
return err
}
// finished is empty, no need to resume
if len(finished) == 0 {
return nil
}
confirm := false
resumeStr := fmt.Sprintf("Found unfinished download, continue from '%d/%d'", len(finished), iter.Total())
if ask {
if err = survey.AskOne(&survey.Confirm{
Message: color.YellowString(resumeStr + "?"),
}, &confirm); err != nil {
return err
}
} else {
color.Yellow(resumeStr)
confirm = true
}
logctx.From(ctx).Debug("Resume download",
zap.Int("finished", len(finished)))
if !confirm {
// clear resume key
return kvd.Delete(ctx, key.Resume(iter.Fingerprint()))
}
iter.SetFinished(finished)
return nil
}
func saveProgress(ctx context.Context, kvd storage.Storage, it *iter) error {
finished := it.Finished()
logctx.From(ctx).Debug("Save progress",
zap.Int("finished", len(finished)))
b, err := json.Marshal(finished)
if err != nil {
return err
}
return kvd.Set(ctx, key.Resume(it.Fingerprint()), b)
}