From 19f673c5d072c2a53593361d6894d629a61516ae Mon Sep 17 00:00:00 2001 From: iyear Date: Sat, 13 Jan 2024 15:42:22 +0800 Subject: [PATCH] feat(forward): support reply to message or topic. close #452 --- app/forward/elem.go | 9 ++++-- app/forward/forward.go | 9 +++--- app/forward/iter.go | 60 ++++++++++++++++++++++++++++---------- pkg/forwarder/forwarder.go | 45 +++++++++++++++++++++------- pkg/forwarder/iter.go | 1 + 5 files changed, 91 insertions(+), 33 deletions(-) diff --git a/app/forward/elem.go b/app/forward/elem.go index 629376a..a4faad9 100644 --- a/app/forward/elem.go +++ b/app/forward/elem.go @@ -8,9 +8,10 @@ import ( ) type iterElem struct { - from peers.Peer - msg *tg.Message - to peers.Peer + from peers.Peer + msg *tg.Message + to peers.Peer + thread int opts iterOptions } @@ -23,6 +24,8 @@ func (i *iterElem) Msg() *tg.Message { return i.msg } func (i *iterElem) To() peers.Peer { return i.to } +func (i *iterElem) Thread() int { return i.thread } + func (i *iterElem) AsSilent() bool { return i.opts.silent } func (i *iterElem) AsDryRun() bool { return i.opts.dryRun } diff --git a/app/forward/forward.go b/app/forward/forward.go index f7e4d94..6efa09c 100644 --- a/app/forward/forward.go +++ b/app/forward/forward.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "reflect" "strings" "github.com/antonmedv/expr" @@ -66,7 +65,7 @@ func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx)) - to, err := resolveDestPeer(ctx, manager, opts.To) + to, err := resolveDest(ctx, manager, opts.To) if err != nil { return errors.Wrap(err, "resolve dest peer") } @@ -125,11 +124,11 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er return dialogs, nil } -// resolveDestPeer parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine. -func resolveDestPeer(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) { +// resolveDest parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine. +func resolveDest(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) { compile := func(i string) (*vm.Program, error) { // we pass empty peer and message to enable type checking - return expr.Compile(i, expr.AsKind(reflect.String), expr.Env(exprEnv(nil, nil))) + return expr.Compile(i, expr.Env(exprEnv(nil, nil))) } // default diff --git a/app/forward/iter.go b/app/forward/iter.go index fc05050..d38811c 100644 --- a/app/forward/iter.go +++ b/app/forward/iter.go @@ -7,6 +7,7 @@ import ( "github.com/go-faster/errors" "github.com/gotd/td/telegram/peers" "github.com/gotd/td/tg" + "github.com/mitchellh/mapstructure" "github.com/iyear/tdl/pkg/dcpool" "github.com/iyear/tdl/pkg/forwarder" @@ -58,6 +59,11 @@ func exprEnv(from peers.Peer, msg *tg.Message) env { return e } +type dest struct { + Peer string + Thread int +} + func newIter(opts iterOptions) *iter { return &iter{ opts: opts, @@ -107,34 +113,58 @@ func (i *iter) Next(ctx context.Context) bool { i.err = errors.Wrap(err, "message routing") return false } - destPeer, ok := result.(string) - if !ok { - i.err = errors.Errorf("message router must return string: %T", result) + + var ( + to peers.Peer + thread int + ) + + switch r := result.(type) { + case string: + // pure chat, no reply to, which is a compatible with old version + // and a convenient way to send message to self + to, err = i.resolvePeer(ctx, r) + case map[string]interface{}: + // chat with reply to topic or message + var d dest + + if err = mapstructure.WeakDecode(r, &d); err != nil { + i.err = errors.Wrapf(err, "decode dest: %v", result) + return false + } + + to, err = i.resolvePeer(ctx, d.Peer) + thread = d.Thread + + default: + i.err = errors.Errorf("message router must return string or dest: %T", result) return false } - var to peers.Peer - if destPeer == "" { // self - to, err = i.opts.manager.Self(ctx) - } else { - to, err = utils.Telegram.GetInputPeer(ctx, i.opts.manager, destPeer) - } - if err != nil { - i.err = errors.Wrapf(err, "resolve dest peer: %s", destPeer) + i.err = errors.Wrapf(err, "resolve dest: %v", result) return false } i.elem = &iterElem{ - from: from, - msg: msg, - to: to, - opts: i.opts, + from: from, + msg: msg, + to: to, + thread: thread, + opts: i.opts, } return true } +func (i *iter) resolvePeer(ctx context.Context, peer string) (peers.Peer, error) { + if peer == "" { // self + return i.opts.manager.Self(ctx) + } + + return utils.Telegram.GetInputPeer(ctx, i.opts.manager, peer) +} + func (i *iter) Value() forwarder.Elem { return i.elem } diff --git a/pkg/forwarder/forwarder.go b/pkg/forwarder/forwarder.go index 220e532..978e717 100644 --- a/pkg/forwarder/forwarder.go +++ b/pkg/forwarder/forwarder.go @@ -7,7 +7,6 @@ import ( "github.com/go-faster/errors" "github.com/gotd/td/bin" - "github.com/gotd/td/telegram/message" "github.com/gotd/td/telegram/peers" "github.com/gotd/td/tg" "go.uber.org/atomic" @@ -121,7 +120,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t Noforwards: false, UpdateStickersetsOrder: false, Peer: elem.To().InputPeer(), - ReplyTo: nil, + ReplyTo: getReplyTo(elem.Thread()), Message: msg.Message, RandomID: f.rand.Int63(), ReplyMarkup: msg.ReplyMarkup, @@ -252,10 +251,27 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t case ModeDirect: // it can be forwarded via API if !protectedDialog(elem.From()) && !protectedMessage(elem.Msg()) { - builder := message.NewSender(f.forwardClient(ctx, elem)). - To(elem.To().InputPeer()).CloneBuilder() - if elem.AsSilent() { - builder = builder.Silent() + directForward := func(ids ...int) error { + req := &tg.MessagesForwardMessagesRequest{ + Silent: elem.AsSilent(), + Background: false, + WithMyScore: false, + DropAuthor: false, + DropMediaCaptions: false, + Noforwards: false, + FromPeer: elem.From().InputPeer(), + ID: ids, + RandomID: nil, + ToPeer: elem.To().InputPeer(), + TopMsgID: elem.Thread(), + ScheduleDate: 0, + SendAs: nil, + } + req.SetFlags() + if _, err := f.forwardClient(ctx, elem).MessagesForwardMessages(ctx, req); err != nil { + return errors.Wrap(err, "directly forward") + } + return nil } if len(grouped) > 0 { @@ -264,14 +280,14 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t ids = append(ids, m.ID) } - if _, err := builder.ForwardIDs(elem.From().InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil { + if err = directForward(ids...); err != nil { goto fallback } return nil } - if _, err := builder.ForwardIDs(elem.From().InputPeer(), elem.Msg().ID).Send(ctx); err != nil { + if err = directForward(elem.Msg().ID); err != nil { goto fallback } return nil @@ -307,7 +323,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t Noforwards: false, UpdateStickersetsOrder: false, Peer: elem.To().InputPeer(), - ReplyTo: nil, + ReplyTo: getReplyTo(elem.Thread()), MultiMedia: media, ScheduleDate: 0, SendAs: nil, @@ -335,7 +351,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t Noforwards: false, UpdateStickersetsOrder: false, Peer: elem.To().InputPeer(), - ReplyTo: nil, + ReplyTo: getReplyTo(elem.Thread()), Media: media, Message: elem.Msg().Message, RandomID: rand.Int63(), @@ -439,3 +455,12 @@ func mediaSizeSum(msg *tg.Message, grouped ...*tg.Message) (int64, error) { return m.Size, nil } + +func getReplyTo(thread int) tg.InputReplyToClass { + replyTo := &tg.InputReplyToMessage{ + ReplyToMsgID: thread, + } + replyTo.SetFlags() + + return replyTo +} diff --git a/pkg/forwarder/iter.go b/pkg/forwarder/iter.go index 0e1c778..e1004ee 100644 --- a/pkg/forwarder/iter.go +++ b/pkg/forwarder/iter.go @@ -19,6 +19,7 @@ type Elem interface { From() peers.Peer Msg() *tg.Message To() peers.Peer + Thread() int // reply to message/topic AsSilent() bool AsDryRun() bool