refactor(dl): use tRun to start session

This commit is contained in:
iyear 2023-12-11 00:12:02 +08:00
parent 9526aee6f9
commit dfbb368ed9
3 changed files with 96 additions and 69 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/AlecAivazis/survey/v2"
"github.com/fatih/color"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
"github.com/spf13/viper"
"go.uber.org/multierr"
@ -51,88 +52,81 @@ type parser struct {
Parser tmessage.ParseSource
}
func Run(ctx context.Context, opts Options) error {
c, kvd, err := tgc.NoLogin(ctx)
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
if err != nil {
return errors.Wrap(err, "create middlewares")
}
pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
logger.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))
if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
it, err := newIter(pool, manager, dialogs, opts)
if err != nil {
return err
}
return tgc.RunWithAuth(ctx, c, func(ctx context.Context) (rerr error) {
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
if err != nil {
return errors.Wrap(err, "create middlewares")
}
pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
parsers := []parser{
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files, true)},
}
dialogs, err := collectDialogs(parsers)
if err != nil {
if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
return err
}
logger.From(ctx).Debug("Collect dialogs",
zap.Any("dialogs", dialogs))
} else {
color.Yellow("Restart download by 'restart' flag")
}
if opts.Serve {
return serve(ctx, kvd, pool, dialogs, opts.Port, opts.Takeout)
defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint())))
}
}()
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)
it, err := newIter(pool, manager, dialogs, opts)
if err != nil {
return err
}
options := downloader.Options{
Pool: pool,
PartSize: viper.GetInt(consts.FlagPartSize),
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
}
limit := viper.GetInt(consts.FlagLimit)
if !opts.Restart {
// resume download and ask user to continue
if err = resume(ctx, kvd, it, !opts.Continue); err != nil {
return err
}
} else {
color.Yellow("Restart download by 'restart' flag")
}
logger.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("part_size", options.PartSize),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))
defer func() { // save progress
if rerr != nil { // download is interrupted
multierr.AppendInto(&rerr, saveProgress(ctx, kvd, it))
} else { // if finished, we should clear resume key
multierr.AppendInto(&rerr, kvd.Delete(key.Resume(it.Fingerprint())))
}
}()
color.Green("All files will be downloaded to '%s' dir", opts.Dir)
dlProgress := prog.New(utils.Byte.FormatBinaryBytes)
dlProgress.SetNumTrackersExpected(it.Total())
prog.EnablePS(ctx, dlProgress)
go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)
options := downloader.Options{
Pool: pool,
PartSize: viper.GetInt(consts.FlagPartSize),
Threads: viper.GetInt(consts.FlagThreads),
Iter: it,
Progress: newProgress(dlProgress, it, opts),
}
limit := viper.GetInt(consts.FlagLimit)
logger.From(ctx).Info("Start download",
zap.String("dir", opts.Dir),
zap.Bool("rewrite_ext", opts.RewriteExt),
zap.Bool("skip_same", opts.SkipSame),
zap.Int("part_size", options.PartSize),
zap.Int("threads", options.Threads),
zap.Int("limit", limit))
color.Green("All files will be downloaded to '%s' dir", opts.Dir)
go dlProgress.Render()
defer prog.Wait(ctx, dlProgress)
return downloader.New(options).Download(ctx, limit)
})
return downloader.New(options).Download(ctx, limit)
}
func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {

View File

@ -1,14 +1,17 @@
package cmd
import (
"context"
"fmt"
"strings"
"github.com/gotd/td/telegram"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/iyear/tdl/app/dl"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
)
@ -25,7 +28,10 @@ func NewDownload() *cobra.Command {
}
opts.Template = viper.GetString(consts.FlagDlTemplate)
return dl.Run(logger.Named(cmd.Context(), "dl"), opts)
return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return dl.Run(logger.Named(ctx, "dl"), c, kvd, opts)
})
},
}

View File

@ -1,12 +1,14 @@
package cmd
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/multierr"
@ -15,6 +17,7 @@ import (
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/tclient"
)
func New() *cobra.Command {
@ -126,3 +129,27 @@ func completeExtFiles(ext ...string) completeFunc {
return files, cobra.ShellCompDirectiveFilterDirs
}
}
func tRun(ctx context.Context, login bool, f func(ctx context.Context, c *telegram.Client, kvd kv.KV) error, middlewares ...telegram.Middleware) error {
// init tclient kv
kvd, err := kv.From(ctx).Open(viper.GetString(consts.FlagNamespace))
if err != nil {
return errors.Wrap(err, "open kv storage")
}
o := tclient.Options{
KV: kvd,
Proxy: viper.GetString(consts.FlagProxy),
NTP: viper.GetString(consts.FlagNTP),
ReconnectTimeout: viper.GetDuration(consts.FlagReconnectTimeout),
Test: viper.GetString(consts.FlagTest) != "",
}
client, err := tclient.New(ctx, o, login, middlewares...)
if err != nil {
return errors.Wrap(err, "create client")
}
return tclient.Run(ctx, client, func(ctx context.Context) error {
return f(ctx, client, kvd)
})
}