feat(forward): support reply to message or topic. close #452

This commit is contained in:
iyear 2024-01-13 15:42:22 +08:00
parent c3de846ad1
commit 19f673c5d0
5 changed files with 91 additions and 33 deletions

View File

@ -8,9 +8,10 @@ import (
)
type iterElem struct {
from peers.Peer
msg *tg.Message
to peers.Peer
from peers.Peer
msg *tg.Message
to peers.Peer
thread int
opts iterOptions
}
@ -23,6 +24,8 @@ func (i *iterElem) Msg() *tg.Message { return i.msg }
func (i *iterElem) To() peers.Peer { return i.to }
func (i *iterElem) Thread() int { return i.thread }
func (i *iterElem) AsSilent() bool { return i.opts.silent }
func (i *iterElem) AsDryRun() bool { return i.opts.dryRun }

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"reflect"
"strings"
"github.com/antonmedv/expr"
@ -66,7 +65,7 @@ func Run(ctx context.Context, c *telegram.Client, kvd kv.KV, opts Options) (rerr
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))
to, err := resolveDestPeer(ctx, manager, opts.To)
to, err := resolveDest(ctx, manager, opts.To)
if err != nil {
return errors.Wrap(err, "resolve dest peer")
}
@ -125,11 +124,11 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er
return dialogs, nil
}
// resolveDestPeer parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine.
func resolveDestPeer(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) {
// resolveDest parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine.
func resolveDest(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) {
compile := func(i string) (*vm.Program, error) {
// we pass empty peer and message to enable type checking
return expr.Compile(i, expr.AsKind(reflect.String), expr.Env(exprEnv(nil, nil)))
return expr.Compile(i, expr.Env(exprEnv(nil, nil)))
}
// default

View File

@ -7,6 +7,7 @@ import (
"github.com/go-faster/errors"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
"github.com/mitchellh/mapstructure"
"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/forwarder"
@ -58,6 +59,11 @@ func exprEnv(from peers.Peer, msg *tg.Message) env {
return e
}
type dest struct {
Peer string
Thread int
}
func newIter(opts iterOptions) *iter {
return &iter{
opts: opts,
@ -107,34 +113,58 @@ func (i *iter) Next(ctx context.Context) bool {
i.err = errors.Wrap(err, "message routing")
return false
}
destPeer, ok := result.(string)
if !ok {
i.err = errors.Errorf("message router must return string: %T", result)
var (
to peers.Peer
thread int
)
switch r := result.(type) {
case string:
// pure chat, no reply to, which is a compatible with old version
// and a convenient way to send message to self
to, err = i.resolvePeer(ctx, r)
case map[string]interface{}:
// chat with reply to topic or message
var d dest
if err = mapstructure.WeakDecode(r, &d); err != nil {
i.err = errors.Wrapf(err, "decode dest: %v", result)
return false
}
to, err = i.resolvePeer(ctx, d.Peer)
thread = d.Thread
default:
i.err = errors.Errorf("message router must return string or dest: %T", result)
return false
}
var to peers.Peer
if destPeer == "" { // self
to, err = i.opts.manager.Self(ctx)
} else {
to, err = utils.Telegram.GetInputPeer(ctx, i.opts.manager, destPeer)
}
if err != nil {
i.err = errors.Wrapf(err, "resolve dest peer: %s", destPeer)
i.err = errors.Wrapf(err, "resolve dest: %v", result)
return false
}
i.elem = &iterElem{
from: from,
msg: msg,
to: to,
opts: i.opts,
from: from,
msg: msg,
to: to,
thread: thread,
opts: i.opts,
}
return true
}
func (i *iter) resolvePeer(ctx context.Context, peer string) (peers.Peer, error) {
if peer == "" { // self
return i.opts.manager.Self(ctx)
}
return utils.Telegram.GetInputPeer(ctx, i.opts.manager, peer)
}
func (i *iter) Value() forwarder.Elem {
return i.elem
}

View File

@ -7,7 +7,6 @@ import (
"github.com/go-faster/errors"
"github.com/gotd/td/bin"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
"go.uber.org/atomic"
@ -121,7 +120,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To().InputPeer(),
ReplyTo: nil,
ReplyTo: getReplyTo(elem.Thread()),
Message: msg.Message,
RandomID: f.rand.Int63(),
ReplyMarkup: msg.ReplyMarkup,
@ -252,10 +251,27 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t
case ModeDirect:
// it can be forwarded via API
if !protectedDialog(elem.From()) && !protectedMessage(elem.Msg()) {
builder := message.NewSender(f.forwardClient(ctx, elem)).
To(elem.To().InputPeer()).CloneBuilder()
if elem.AsSilent() {
builder = builder.Silent()
directForward := func(ids ...int) error {
req := &tg.MessagesForwardMessagesRequest{
Silent: elem.AsSilent(),
Background: false,
WithMyScore: false,
DropAuthor: false,
DropMediaCaptions: false,
Noforwards: false,
FromPeer: elem.From().InputPeer(),
ID: ids,
RandomID: nil,
ToPeer: elem.To().InputPeer(),
TopMsgID: elem.Thread(),
ScheduleDate: 0,
SendAs: nil,
}
req.SetFlags()
if _, err := f.forwardClient(ctx, elem).MessagesForwardMessages(ctx, req); err != nil {
return errors.Wrap(err, "directly forward")
}
return nil
}
if len(grouped) > 0 {
@ -264,14 +280,14 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t
ids = append(ids, m.ID)
}
if _, err := builder.ForwardIDs(elem.From().InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil {
if err = directForward(ids...); err != nil {
goto fallback
}
return nil
}
if _, err := builder.ForwardIDs(elem.From().InputPeer(), elem.Msg().ID).Send(ctx); err != nil {
if err = directForward(elem.Msg().ID); err != nil {
goto fallback
}
return nil
@ -307,7 +323,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To().InputPeer(),
ReplyTo: nil,
ReplyTo: getReplyTo(elem.Thread()),
MultiMedia: media,
ScheduleDate: 0,
SendAs: nil,
@ -335,7 +351,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*t
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To().InputPeer(),
ReplyTo: nil,
ReplyTo: getReplyTo(elem.Thread()),
Media: media,
Message: elem.Msg().Message,
RandomID: rand.Int63(),
@ -439,3 +455,12 @@ func mediaSizeSum(msg *tg.Message, grouped ...*tg.Message) (int64, error) {
return m.Size, nil
}
func getReplyTo(thread int) tg.InputReplyToClass {
replyTo := &tg.InputReplyToMessage{
ReplyToMsgID: thread,
}
replyTo.SetFlags()
return replyTo
}

View File

@ -19,6 +19,7 @@ type Elem interface {
From() peers.Peer
Msg() *tg.Message
To() peers.Peer
Thread() int // reply to message/topic
AsSilent() bool
AsDryRun() bool