refactor(forward): place silent, mode, dry-run in elem

This commit is contained in:
iyear 2023-11-26 20:40:56 +08:00
parent d8789f7c74
commit 98233e1de4
6 changed files with 126 additions and 111 deletions

View File

@ -81,10 +81,15 @@ func Run(ctx context.Context, opts Options) error {
fw := forwarder.New(forwarder.Options{
Pool: pool,
Iter: newIter(manager, pool, to, dialogs),
Silent: opts.Silent,
DryRun: opts.DryRun,
Mode: opts.Mode,
Iter: newIter(iterOptions{
manager: manager,
pool: pool,
to: to,
dialogs: dialogs,
mode: opts.Mode,
silent: opts.Silent,
dryRun: opts.DryRun,
}),
Progress: newProgress(fwProgress),
PartSize: viper.GetInt(consts.FlagPartSize),
})

View File

@ -15,14 +15,22 @@ import (
"github.com/iyear/tdl/pkg/utils"
)
type iter struct {
type iterOptions struct {
manager *peers.Manager
pool dcpool.Pool
to *vm.Program
dialogs []*tmessage.Dialog
i, j int
elem *forwarder.Elem
err error
mode forwarder.Mode
silent bool
dryRun bool
}
type iter struct {
opts iterOptions
i, j int
elem *forwarder.Elem
err error
}
type env struct {
@ -50,12 +58,14 @@ func exprEnv(from peers.Peer, msg *tg.Message) env {
return e
}
func newIter(manager *peers.Manager, pool dcpool.Pool, to *vm.Program, dialogs []*tmessage.Dialog) *iter {
func newIter(opts iterOptions) *iter {
return &iter{
manager: manager,
pool: pool,
to: to,
dialogs: dialogs,
opts: opts,
i: 0,
j: 0,
elem: nil,
err: nil,
}
}
@ -68,31 +78,31 @@ func (i *iter) Next(ctx context.Context) bool {
}
// end of iteration or error occurred
if i.i >= len(i.dialogs) || i.err != nil {
if i.i >= len(i.opts.dialogs) || i.err != nil {
return false
}
p, m := i.dialogs[i.i].Peer, i.dialogs[i.i].Messages[i.j]
p, m := i.opts.dialogs[i.i].Peer, i.opts.dialogs[i.i].Messages[i.j]
if i.j++; i.j >= len(i.dialogs[i.i].Messages) {
if i.j++; i.j >= len(i.opts.dialogs[i.i].Messages) {
i.i++
i.j = 0
}
from, err := i.manager.FromInputPeer(ctx, p)
from, err := i.opts.manager.FromInputPeer(ctx, p)
if err != nil {
i.err = errors.Wrap(err, "get from peer")
return false
}
msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), from.InputPeer(), m)
msg, err := utils.Telegram.GetSingleMessage(ctx, i.opts.pool.Default(ctx), from.InputPeer(), m)
if err != nil {
i.err = errors.Wrapf(err, "get message: %d", m)
return false
}
// message routing
result, err := texpr.Run(i.to, exprEnv(from, msg))
result, err := texpr.Run(i.opts.to, exprEnv(from, msg))
if err != nil {
i.err = errors.Wrap(err, "message routing")
return false
@ -105,9 +115,9 @@ func (i *iter) Next(ctx context.Context) bool {
var to peers.Peer
if destPeer == "" { // self
to, err = i.manager.Self(ctx)
to, err = i.opts.manager.Self(ctx)
} else {
to, err = utils.Telegram.GetInputPeer(ctx, i.manager, destPeer)
to, err = utils.Telegram.GetInputPeer(ctx, i.opts.manager, destPeer)
}
if err != nil {
@ -116,9 +126,12 @@ func (i *iter) Next(ctx context.Context) bool {
}
i.elem = &forwarder.Elem{
From: from,
To: to,
Msg: msg,
From: from,
Msg: msg,
To: to,
Silent: i.opts.silent,
DryRun: i.opts.dryRun,
Mode: i.opts.mode,
}
return true

View File

@ -15,44 +15,50 @@ import (
type progress struct {
pw pw.Writer
trackers map[[3]int64]*pw.Tracker
meta map[int64]string
trackers map[tuple]*pw.Tracker
elemName map[int64]string
}
type tuple struct {
from int64
msg int
to int64
}
func newProgress(p pw.Writer) *progress {
return &progress{
pw: p,
trackers: make(map[[3]int64]*pw.Tracker),
meta: make(map[int64]string),
trackers: make(map[tuple]*pw.Tracker),
elemName: make(map[int64]string),
}
}
func (p *progress) OnAdd(meta *forwarder.ProgressMeta) {
tracker := prog.AppendTracker(p.pw, pw.FormatNumber, p.processMessage(meta, false), 1)
p.trackers[p.tuple(meta)] = tracker
func (p *progress) OnAdd(elem *forwarder.Elem) {
tracker := prog.AppendTracker(p.pw, pw.FormatNumber, p.processMessage(elem, false), 1)
p.trackers[p.tuple(elem)] = tracker
}
func (p *progress) OnClone(meta *forwarder.ProgressMeta, state forwarder.ProgressState) {
tracker, ok := p.trackers[p.tuple(meta)]
func (p *progress) OnClone(elem *forwarder.Elem, state forwarder.ProgressState) {
tracker, ok := p.trackers[p.tuple(elem)]
if !ok {
return
}
// display re-upload transfer info
tracker.Units.Formatter = utils.Byte.FormatBinaryBytes
tracker.UpdateMessage(p.processMessage(meta, true))
tracker.UpdateMessage(p.processMessage(elem, true))
tracker.UpdateTotal(state.Total)
tracker.SetValue(state.Done)
}
func (p *progress) OnDone(meta *forwarder.ProgressMeta, err error) {
tracker, ok := p.trackers[p.tuple(meta)]
func (p *progress) OnDone(elem *forwarder.Elem, err error) {
tracker, ok := p.trackers[p.tuple(elem)]
if !ok {
return
}
if err != nil {
p.pw.Log(color.RedString("%s error: %s", p.metaString(meta), err.Error()))
p.pw.Log(color.RedString("%s error: %s", p.metaString(elem), err.Error()))
tracker.MarkAsErrored()
return
}
@ -60,14 +66,18 @@ func (p *progress) OnDone(meta *forwarder.ProgressMeta, err error) {
tracker.Increment(1)
}
func (p *progress) tuple(meta *forwarder.ProgressMeta) [3]int64 {
return [3]int64{meta.From.ID(), int64(meta.Msg.ID), meta.To.ID()}
func (p *progress) tuple(elem *forwarder.Elem) tuple {
return tuple{
from: elem.From.ID(),
msg: elem.Msg.ID,
to: elem.To.ID(),
}
}
func (p *progress) processMessage(meta *forwarder.ProgressMeta, clone bool) string {
func (p *progress) processMessage(elem *forwarder.Elem, clone bool) string {
b := &strings.Builder{}
b.WriteString(p.metaString(meta))
b.WriteString(p.metaString(elem))
if clone {
b.WriteString(" [clone]")
}
@ -75,14 +85,14 @@ func (p *progress) processMessage(meta *forwarder.ProgressMeta, clone bool) stri
return b.String()
}
func (p *progress) metaString(meta *forwarder.ProgressMeta) string {
func (p *progress) metaString(elem *forwarder.Elem) string {
// TODO(iyear): better responsive name
if _, ok := p.meta[meta.From.ID()]; !ok {
p.meta[meta.From.ID()] = runewidth.Truncate(meta.From.VisibleName(), 15, "...")
if _, ok := p.elemName[elem.From.ID()]; !ok {
p.elemName[elem.From.ID()] = runewidth.Truncate(elem.From.VisibleName(), 15, "...")
}
if _, ok := p.meta[meta.To.ID()]; !ok {
p.meta[meta.To.ID()] = runewidth.Truncate(meta.To.VisibleName(), 15, "...")
if _, ok := p.elemName[elem.To.ID()]; !ok {
p.elemName[elem.To.ID()] = runewidth.Truncate(elem.To.VisibleName(), 15, "...")
}
return fmt.Sprintf("%s(%d):%d -> %s(%d)", p.meta[meta.From.ID()], meta.From.ID(), meta.Msg.ID, p.meta[meta.To.ID()], meta.To.ID())
return fmt.Sprintf("%s(%d):%d -> %s(%d)", p.elemName[elem.From.ID()], elem.From.ID(), elem.Msg.ID, p.elemName[elem.To.ID()], elem.To.ID())
}

View File

@ -20,9 +20,9 @@ type CloneOptions struct {
Progress uploader.Progress
}
func (f *Forwarder) CloneMedia(ctx context.Context, opts CloneOptions) (tg.InputFileClass, error) {
func (f *Forwarder) CloneMedia(ctx context.Context, opts CloneOptions,dryRun bool) (tg.InputFileClass, error) {
// if dry run, just return empty input file
if f.opts.DryRun {
if dryRun {
// directly call progress callback
if err := opts.Progress.Chunk(ctx, uploader.ProgressState{
Uploaded: opts.Media.Size,

View File

@ -35,15 +35,16 @@ type Elem struct {
From peers.Peer
Msg *tg.Message
To peers.Peer
Silent bool
DryRun bool
Mode Mode
}
type Options struct {
Pool dcpool.Pool
PartSize int
Iter Iter
Silent bool
DryRun bool
Mode Mode
Progress Progress
}
@ -75,14 +76,14 @@ func (f *Forwarder) Forward(ctx context.Context) error {
continue
}
if err = f.forwardMessage(ctx, elem.From, elem.To, elem.Msg, grouped...); err != nil {
if err = f.forwardMessage(ctx, elem, grouped...); err != nil {
continue
}
continue
}
if err := f.forwardMessage(ctx, elem.From, elem.To, elem.Msg); err != nil {
if err := f.forwardMessage(ctx, elem); err != nil {
// canceled by user, so we directly return error to stop all
if errors.Is(err, context.Canceled) {
return err
@ -94,28 +95,22 @@ func (f *Forwarder) Forward(ctx context.Context) error {
return f.opts.Iter.Err()
}
func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg *tg.Message, grouped ...*tg.Message) (rerr error) {
meta := &ProgressMeta{
From: from,
Msg: msg,
To: to,
}
f.opts.Progress.OnAdd(meta)
func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*tg.Message) (rerr error) {
f.opts.Progress.OnAdd(elem)
defer func() {
f.sent[f.sentTuple(from, msg)] = struct{}{}
f.sent[f.sentTuple(elem.From, elem.Msg)] = struct{}{}
// grouped message also should be marked as sent
for _, m := range grouped {
f.sent[f.sentTuple(from, m)] = struct{}{}
f.sent[f.sentTuple(elem.From, m)] = struct{}{}
}
f.opts.Progress.OnDone(meta, rerr)
f.opts.Progress.OnDone(elem, rerr)
}()
log := logger.From(ctx).With(
zap.Int64("from", from.ID()),
zap.Int64("to", to.ID()),
zap.Int("message", msg.ID))
zap.Int64("from", elem.From.ID()),
zap.Int64("to", elem.To.ID()),
zap.Int("message", elem.Msg.ID))
forwardTextOnly := func(msg *tg.Message) error {
if msg.Message == "" {
@ -123,12 +118,12 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
}
req := &tg.MessagesSendMessageRequest{
NoWebpage: false,
Silent: f.opts.Silent,
Silent: elem.Silent,
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: to.InputPeer(),
Peer: elem.To.InputPeer(),
ReplyTo: nil,
Message: msg.Message,
RandomID: f.rand.Int63(),
@ -139,7 +134,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
}
req.SetFlags()
if _, err := f.forwardClient(ctx).MessagesSendMessage(ctx, req); err != nil {
if _, err := f.forwardClient(ctx, elem).MessagesSendMessage(ctx, req); err != nil {
return errors.Wrap(err, "send message")
}
return nil
@ -157,7 +152,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
// we should clone photo and document via re-upload, it will be banned if we forward it directly.
// but other media can be forwarded directly via copy
if (!protectedDialog(from) && !protectedMessage(msg)) || !photoOrDocument(msg.Media) {
if (!protectedDialog(elem.From) && !protectedMessage(msg)) || !photoOrDocument(msg.Media) {
media, ok := tmedia.ConvInputMedia(msg.Media)
if !ok {
return nil, errors.Errorf("can't convert message %d to input class directly", msg.ID)
@ -168,7 +163,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
media, ok := tmedia.GetMedia(msg)
if !ok {
log.Warn("Can't get media from message",
zap.Int64("peer", from.ID()),
zap.Int64("peer", elem.From.ID()),
zap.Int("message", msg.ID))
// unsupported re-upload media
@ -179,10 +174,10 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
Media: media,
PartSize: f.opts.PartSize,
Progress: uploadProgress{
meta: meta,
elem: elem,
progress: f.opts.Progress,
},
})
}, elem.DryRun)
if err != nil {
return nil, errors.Wrap(err, "clone media")
}
@ -212,7 +207,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
Media: thumb,
PartSize: f.opts.PartSize,
Progress: nopProgress{},
})
}, elem.DryRun)
if err != nil {
return nil, errors.Wrap(err, "clone thumb")
}
@ -236,13 +231,13 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
}
}
switch f.opts.Mode {
switch elem.Mode {
case ModeDirect:
// it can be forwarded via API
if !protectedDialog(from) && !protectedMessage(msg) {
builder := message.NewSender(f.forwardClient(ctx)).
To(to.InputPeer()).CloneBuilder()
if f.opts.Silent {
if !protectedDialog(elem.From) && !protectedMessage(elem.Msg) {
builder := message.NewSender(f.forwardClient(ctx, elem)).
To(elem.To.InputPeer()).CloneBuilder()
if elem.Silent {
builder = builder.Silent()
}
@ -252,14 +247,14 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
ids = append(ids, m.ID)
}
if _, err := builder.ForwardIDs(from.InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil {
if _, err := builder.ForwardIDs(elem.From.InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil {
goto fallback
}
return nil
}
if _, err := builder.ForwardIDs(from.InputPeer(), msg.ID).Send(ctx); err != nil {
if _, err := builder.ForwardIDs(elem.From.InputPeer(), elem.Msg.ID).Send(ctx); err != nil {
goto fallback
}
return nil
@ -288,58 +283,58 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
if len(media) > 0 {
req := &tg.MessagesSendMultiMediaRequest{
Silent: f.opts.Silent,
Silent: elem.Silent,
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: to.InputPeer(),
Peer: elem.To.InputPeer(),
ReplyTo: nil,
MultiMedia: media,
ScheduleDate: 0,
SendAs: nil,
}
req.SetFlags()
if _, err := f.forwardClient(ctx).MessagesSendMultiMedia(ctx, req); err != nil {
if _, err := f.forwardClient(ctx, elem).MessagesSendMultiMedia(ctx, req); err != nil {
return errors.Wrap(err, "send multi media")
}
return nil
}
return forwardTextOnly(msg)
return forwardTextOnly(elem.Msg)
}
media, err := convForwardedMedia(msg)
media, err := convForwardedMedia(elem.Msg)
if err != nil {
log.Debug("Can't convert forwarded media", zap.Error(err))
return forwardTextOnly(msg)
return forwardTextOnly(elem.Msg)
}
// send text copy with forwarded media
req := &tg.MessagesSendMediaRequest{
Silent: f.opts.Silent,
Silent: elem.Silent,
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: to.InputPeer(),
Peer: elem.To.InputPeer(),
ReplyTo: nil,
Media: media,
Message: msg.Message,
Message: elem.Msg.Message,
RandomID: rand.Int63(),
ReplyMarkup: msg.ReplyMarkup,
Entities: msg.Entities,
ReplyMarkup: elem.Msg.ReplyMarkup,
Entities: elem.Msg.Entities,
ScheduleDate: 0,
SendAs: nil,
}
req.SetFlags()
if _, err := f.forwardClient(ctx).MessagesSendMedia(ctx, req); err != nil {
if _, err := f.forwardClient(ctx, elem).MessagesSendMedia(ctx, req); err != nil {
return errors.Wrap(err, "send single media")
}
return nil
}
return fmt.Errorf("unknown mode: %s", f.opts.Mode)
return fmt.Errorf("unknown mode: %s", elem.Mode)
}
func (f *Forwarder) sentTuple(peer peers.Peer, msg *tg.Message) [2]int64 {
@ -352,8 +347,8 @@ func (n nopInvoker) Invoke(_ context.Context, _ bin.Encoder, _ bin.Decoder) erro
return nil
}
func (f *Forwarder) forwardClient(ctx context.Context) *tg.Client {
if f.opts.DryRun {
func (f *Forwarder) forwardClient(ctx context.Context, elem *Elem) *tg.Client {
if elem.DryRun {
return tg.NewClient(nopInvoker{})
}

View File

@ -3,21 +3,13 @@ package forwarder
import (
"context"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/telegram/uploader"
"github.com/gotd/td/tg"
)
type Progress interface {
OnAdd(meta *ProgressMeta)
OnClone(meta *ProgressMeta, state ProgressState)
OnDone(meta *ProgressMeta, err error)
}
type ProgressMeta struct {
From peers.Peer
Msg *tg.Message
To peers.Peer
OnAdd(elem *Elem)
OnClone(elem *Elem, state ProgressState)
OnDone(elem *Elem, err error)
}
type ProgressState struct {
@ -26,12 +18,12 @@ type ProgressState struct {
}
type uploadProgress struct {
meta *ProgressMeta
elem *Elem
progress Progress
}
func (p uploadProgress) Chunk(_ context.Context, state uploader.ProgressState) error {
p.progress.OnClone(p.meta, ProgressState{
p.progress.OnClone(p.elem, ProgressState{
Done: state.Uploaded,
Total: state.Total,
})