tdl/app/dl/dl.go

197 lines
4.8 KiB
Go
Raw Permalink Normal View History

2022-09-22 23:41:47 +08:00
package dl
2022-09-01 19:10:08 +08:00
import (
"context"
2023-01-28 16:51:09 +08:00
"encoding/json"
2022-12-21 19:46:27 +08:00
"fmt"
2023-08-20 21:00:06 +08:00
2023-01-28 16:51:09 +08:00
"github.com/AlecAivazis/survey/v2"
2022-12-21 19:46:27 +08:00
"github.com/fatih/color"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
2023-08-20 21:00:06 +08:00
"github.com/spf13/viper"
"go.uber.org/multierr"
"go.uber.org/zap"
"github.com/iyear/tdl/core/dcpool"
"github.com/iyear/tdl/core/downloader"
"github.com/iyear/tdl/core/logctx"
"github.com/iyear/tdl/core/storage"
"github.com/iyear/tdl/core/tclient"
2022-09-01 19:10:08 +08:00
"github.com/iyear/tdl/pkg/consts"
2023-01-28 16:51:09 +08:00
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
2022-09-01 19:10:08 +08:00
)
type Options struct {
Dir string
RewriteExt bool
SkipSame bool
Template string
URLs []string
Files []string
Include []string
Exclude []string
Desc bool
Takeout bool
Group bool // auto detect grouped message
2023-02-14 22:35:37 +08:00
// resume opts
Continue, Restart bool
// serve
Serve bool
Port int
}
2023-01-28 23:26:52 +08:00
type parser struct {
Data []string
Parser tmessage.ParseSource
2023-01-28 23:26:52 +08:00
}
func Run(ctx context.Context, c *telegram.Client, kvd storage.Storage, opts Options) (rerr error) {
2023-12-11 11:23:04 +08:00
pool := dcpool.NewPool(c,
int64(viper.GetInt(consts.FlagPoolSize)),
tclient.NewDefaultMiddlewares(ctx, viper.GetDuration(consts.FlagReconnectTimeout))...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
logctx.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))
2022-12-21 19:46:27 +08:00
if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
}
2022-12-21 19:46:27 +08:00
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
it, err := newIter(pool, manager, dialogs, opts, viper.GetDuration(consts.FlagDelay))
if err != nil {
return err
}
if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
2022-12-21 19:46:27 +08:00
return err
}
} else {
color.Yellow("Restart download by 'restart' flag")
}
2022-12-21 19:46:27 +08:00
defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(ctx, key.Resume(it.Fingerprint())))
}
}()
dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)
options := downloader.Options{
Pool: pool,
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
}
limit := viper.GetInt(consts.FlagLimit)
2023-01-30 20:50:29 +08:00
logctx.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))
2023-01-30 20:50:29 +08:00
color.Green("All files will be downloaded to '%s' dir", opts.Dir)
go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)
return downloader.New(options).Download(ctx, limit)
2022-09-01 19:10:08 +08:00
}
2023-01-28 16:51:09 +08:00
func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {
var dialogs [][]*tmessage.Dialog
2023-01-28 23:26:52 +08:00
for _, p := range parsers {
d, err := tmessage.Parse(p.Parser)
2023-01-28 23:26:52 +08:00
if err != nil {
return nil, err
}
dialogs = append(dialogs, d)
}
return dialogs, nil
}
func resume(ctx context.Context, kvd storage.Storage, iter *iter, ask bool) error {
logctx.From(ctx).Debug("Check resume key",
2023-01-30 20:50:29 +08:00
zap.String("fingerprint", iter.Fingerprint()))
b, err := kvd.Get(ctx, key.Resume(iter.Fingerprint()))
if err != nil && !errors.Is(err, storage.ErrNotFound) {
2023-01-28 16:51:09 +08:00
return err
}
if len(b) == 0 { // no progress
return nil
}
finished := make(map[int]struct{})
if err = json.Unmarshal(b, &finished); err != nil {
return err
}
// finished is empty, no need to resume
if len(finished) == 0 {
return nil
}
confirm := false
resumeStr := fmt.Sprintf("Found unfinished download, continue from '%d/%d'", len(finished), iter.Total())
2023-02-14 22:35:37 +08:00
if ask {
if err = survey.AskOne(&survey.Confirm{
Message: color.YellowString(resumeStr + "?"),
}, &confirm); err != nil {
return err
}
} else {
color.Yellow(resumeStr)
confirm = true
2023-01-28 16:51:09 +08:00
}
logctx.From(ctx).Debug("Resume download",
2023-01-30 20:50:29 +08:00
zap.Int("finished", len(finished)))
2023-01-28 16:51:09 +08:00
if !confirm {
// clear resume key
return kvd.Delete(ctx, key.Resume(iter.Fingerprint()))
2023-01-28 16:51:09 +08:00
}
2023-01-28 23:26:52 +08:00
iter.SetFinished(finished)
2023-01-28 16:51:09 +08:00
return nil
}
func saveProgress(ctx context.Context, kvd storage.Storage, it *iter) error {
2023-01-30 20:50:29 +08:00
finished := it.Finished()
logctx.From(ctx).Debug("Save progress",
2023-01-30 20:50:29 +08:00
zap.Int("finished", len(finished)))
b, err := json.Marshal(finished)
2023-01-28 16:51:09 +08:00
if err != nil {
return err
}
return kvd.Set(ctx, key.Resume(it.Fingerprint()), b)
2023-01-28 16:51:09 +08:00
}