mirror of
https://github.com/iyear/tdl
synced 2025-01-08 11:57:55 +08:00
refactor(forward): use tRun to start session
This commit is contained in:
parent
ef751457d8
commit
3f65a374fb
@ -10,6 +10,7 @@ import (
|
||||
"github.com/antonmedv/expr"
|
||||
"github.com/antonmedv/expr/vm"
|
||||
"github.com/go-faster/errors"
|
||||
"github.com/gotd/td/telegram"
|
||||
"github.com/gotd/td/telegram/peers"
|
||||
pw "github.com/jedib0t/go-pretty/v6/progress"
|
||||
"github.com/spf13/viper"
|
||||
@ -20,6 +21,7 @@ import (
|
||||
"github.com/iyear/tdl/pkg/consts"
|
||||
"github.com/iyear/tdl/pkg/dcpool"
|
||||
"github.com/iyear/tdl/pkg/forwarder"
|
||||
"github.com/iyear/tdl/pkg/kv"
|
||||
"github.com/iyear/tdl/pkg/prog"
|
||||
"github.com/iyear/tdl/pkg/storage"
|
||||
"github.com/iyear/tdl/pkg/texpr"
|
||||
@ -35,7 +37,7 @@ type Options struct {
|
||||
DryRun bool
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, opts Options) error {
|
||||
func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr error) {
|
||||
if opts.To == "-" {
|
||||
fg := texpr.NewFieldsGetter(nil)
|
||||
|
||||
@ -48,60 +50,54 @@ func Run(ctx context.Context, opts Options) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
c, kvd, err := tgc.NoLogin(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx = tctx.WithKV(ctx, kvd)
|
||||
|
||||
return tgc.RunWithAuth(ctx, c, func(ctx context.Context) (rerr error) {
|
||||
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "create middlewares")
|
||||
}
|
||||
middlewares, err := tgc.NewDefaultMiddlewares(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "create middlewares")
|
||||
}
|
||||
|
||||
pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
|
||||
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
|
||||
pool := dcpool.NewPool(c, int64(viper.GetInt(consts.FlagPoolSize)), middlewares...)
|
||||
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))
|
||||
|
||||
ctx = tctx.WithPool(ctx, pool)
|
||||
ctx = tctx.WithPool(ctx, pool)
|
||||
|
||||
dialogs, err := collectDialogs(ctx, opts.From)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "collect dialogs")
|
||||
}
|
||||
dialogs, err := collectDialogs(ctx, opts.From)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "collect dialogs")
|
||||
}
|
||||
|
||||
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
|
||||
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
|
||||
|
||||
to, err := resolveDestPeer(ctx, manager, opts.To)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "resolve dest peer")
|
||||
}
|
||||
to, err := resolveDestPeer(ctx, manager, opts.To)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "resolve dest peer")
|
||||
}
|
||||
|
||||
fwProgress := prog.New(pw.FormatNumber)
|
||||
fwProgress.SetNumTrackersExpected(totalMessages(dialogs))
|
||||
prog.EnablePS(ctx, fwProgress)
|
||||
fwProgress := prog.New(pw.FormatNumber)
|
||||
fwProgress.SetNumTrackersExpected(totalMessages(dialogs))
|
||||
prog.EnablePS(ctx, fwProgress)
|
||||
|
||||
fw := forwarder.New(forwarder.Options{
|
||||
Pool: pool,
|
||||
Iter: newIter(iterOptions{
|
||||
manager: manager,
|
||||
pool: pool,
|
||||
to: to,
|
||||
dialogs: dialogs,
|
||||
mode: opts.Mode,
|
||||
silent: opts.Silent,
|
||||
dryRun: opts.DryRun,
|
||||
}),
|
||||
Progress: newProgress(fwProgress),
|
||||
PartSize: viper.GetInt(consts.FlagPartSize),
|
||||
Threads: viper.GetInt(consts.FlagThreads),
|
||||
})
|
||||
|
||||
go fwProgress.Render()
|
||||
defer prog.Wait(ctx, fwProgress)
|
||||
|
||||
return fw.Forward(ctx)
|
||||
fw := forwarder.New(forwarder.Options{
|
||||
Pool: pool,
|
||||
Iter: newIter(iterOptions{
|
||||
manager: manager,
|
||||
pool: pool,
|
||||
to: to,
|
||||
dialogs: dialogs,
|
||||
mode: opts.Mode,
|
||||
silent: opts.Silent,
|
||||
dryRun: opts.DryRun,
|
||||
}),
|
||||
Progress: newProgress(fwProgress),
|
||||
PartSize: viper.GetInt(consts.FlagPartSize),
|
||||
Threads: viper.GetInt(consts.FlagThreads),
|
||||
})
|
||||
|
||||
go fwProgress.Render()
|
||||
defer prog.Wait(ctx, fwProgress)
|
||||
|
||||
return fw.Forward(ctx)
|
||||
}
|
||||
|
||||
func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, error) {
|
||||
|
@ -1,13 +1,16 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/gotd/td/telegram"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/iyear/tdl/app/forward"
|
||||
"github.com/iyear/tdl/pkg/forwarder"
|
||||
"github.com/iyear/tdl/pkg/kv"
|
||||
"github.com/iyear/tdl/pkg/logger"
|
||||
)
|
||||
|
||||
@ -18,7 +21,9 @@ func NewForward() *cobra.Command {
|
||||
Use: "forward",
|
||||
Short: "Forward messages with automatic fallback and message routing",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return forward.Run(logger.Named(cmd.Context(), "forward"), opts)
|
||||
return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
|
||||
return forward.Run(logger.Named(cmd.Context(), "forward"), c, kvd, opts)
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user