refactor(forward): extract to interface

This commit is contained in:
iyear 2023-11-27 11:24:47 +08:00
parent e69f704780
commit ffc97c26a4
7 changed files with 128 additions and 79 deletions

28
app/forward/elem.go Normal file
View File

@ -0,0 +1,28 @@
package forward
import (
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
"github.com/iyear/tdl/pkg/forwarder"
)
type iterElem struct {
from peers.Peer
msg *tg.Message
to peers.Peer
opts iterOptions
}
func (i *iterElem) Mode() forwarder.Mode { return i.opts.mode }
func (i *iterElem) From() peers.Peer { return i.from }
func (i *iterElem) Msg() *tg.Message { return i.msg }
func (i *iterElem) To() peers.Peer { return i.to }
func (i *iterElem) AsSilent() bool { return i.opts.silent }
func (i *iterElem) AsDryRun() bool { return i.opts.dryRun }

View File

@ -78,6 +78,8 @@ func Run(ctx context.Context, opts Options) error {
}
fwProgress := prog.New(pw.FormatNumber)
fwProgress.SetNumTrackersExpected(totalMessages(dialogs))
prog.EnablePS(ctx, fwProgress)
fw := forwarder.New(forwarder.Options{
Pool: pool,
@ -155,3 +157,11 @@ func resolveDestPeer(ctx context.Context, manager *peers.Manager, input string)
// text
return compile(input)
}
func totalMessages(dialogs []*tmessage.Dialog) int {
var total int
for _, d := range dialogs {
total += len(d.Messages)
}
return total
}

View File

@ -29,7 +29,7 @@ type iter struct {
opts iterOptions
i, j int
elem *forwarder.Elem
elem forwarder.Elem
err error
}
@ -125,19 +125,17 @@ func (i *iter) Next(ctx context.Context) bool {
return false
}
i.elem = &forwarder.Elem{
From: from,
Msg: msg,
To: to,
Silent: i.opts.silent,
DryRun: i.opts.dryRun,
Mode: i.opts.mode,
i.elem = &iterElem{
from: from,
msg: msg,
to: to,
opts: i.opts,
}
return true
}
func (i *iter) Value() *forwarder.Elem {
func (i *iter) Value() forwarder.Elem {
return i.elem
}

View File

@ -15,7 +15,7 @@ import (
type progress struct {
pw pw.Writer
trackers map[tuple]*pw.Tracker
trackers map[tuple]*pw.Tracker // TODO(iyear): concurrent map
elemName map[int64]string
}
@ -33,12 +33,12 @@ func newProgress(p pw.Writer) *progress {
}
}
func (p *progress) OnAdd(elem *forwarder.Elem) {
func (p *progress) OnAdd(elem forwarder.Elem) {
tracker := prog.AppendTracker(p.pw, pw.FormatNumber, p.processMessage(elem, false), 1)
p.trackers[p.tuple(elem)] = tracker
}
func (p *progress) OnClone(elem *forwarder.Elem, state forwarder.ProgressState) {
func (p *progress) OnClone(elem forwarder.Elem, state forwarder.ProgressState) {
tracker, ok := p.trackers[p.tuple(elem)]
if !ok {
return
@ -51,7 +51,7 @@ func (p *progress) OnClone(elem *forwarder.Elem, state forwarder.ProgressState)
tracker.SetValue(state.Done)
}
func (p *progress) OnDone(elem *forwarder.Elem, err error) {
func (p *progress) OnDone(elem forwarder.Elem, err error) {
tracker, ok := p.trackers[p.tuple(elem)]
if !ok {
return
@ -66,15 +66,15 @@ func (p *progress) OnDone(elem *forwarder.Elem, err error) {
tracker.Increment(1)
}
func (p *progress) tuple(elem *forwarder.Elem) tuple {
func (p *progress) tuple(elem forwarder.Elem) tuple {
return tuple{
from: elem.From.ID(),
msg: elem.Msg.ID,
to: elem.To.ID(),
from: elem.From().ID(),
msg: elem.Msg().ID,
to: elem.To().ID(),
}
}
func (p *progress) processMessage(elem *forwarder.Elem, clone bool) string {
func (p *progress) processMessage(elem forwarder.Elem, clone bool) string {
b := &strings.Builder{}
b.WriteString(p.metaString(elem))
@ -85,14 +85,19 @@ func (p *progress) processMessage(elem *forwarder.Elem, clone bool) string {
return b.String()
}
func (p *progress) metaString(elem *forwarder.Elem) string {
func (p *progress) metaString(elem forwarder.Elem) string {
// TODO(iyear): better responsive name
if _, ok := p.elemName[elem.From.ID()]; !ok {
p.elemName[elem.From.ID()] = runewidth.Truncate(elem.From.VisibleName(), 15, "...")
if _, ok := p.elemName[elem.From().ID()]; !ok {
p.elemName[elem.From().ID()] = runewidth.Truncate(elem.From().VisibleName(), 15, "...")
}
if _, ok := p.elemName[elem.To.ID()]; !ok {
p.elemName[elem.To.ID()] = runewidth.Truncate(elem.To.VisibleName(), 15, "...")
if _, ok := p.elemName[elem.To().ID()]; !ok {
p.elemName[elem.To().ID()] = runewidth.Truncate(elem.To().VisibleName(), 15, "...")
}
return fmt.Sprintf("%s(%d):%d -> %s(%d)", p.elemName[elem.From.ID()], elem.From.ID(), elem.Msg.ID, p.elemName[elem.To.ID()], elem.To.ID())
return fmt.Sprintf("%s(%d):%d -> %s(%d)",
p.elemName[elem.From().ID()],
elem.From().ID(),
elem.Msg().ID,
p.elemName[elem.To().ID()],
elem.To().ID())
}

View File

@ -2,7 +2,6 @@ package forwarder
import (
"context"
"fmt"
"math/rand"
"time"
@ -25,22 +24,6 @@ import (
// ENUM(direct, clone)
type Mode int
type Iter interface {
Next(ctx context.Context) bool
Value() *Elem
Err() error
}
type Elem struct {
From peers.Peer
Msg *tg.Message
To peers.Peer
Silent bool
DryRun bool
Mode Mode
}
type Options struct {
Pool dcpool.Pool
PartSize int
@ -65,13 +48,13 @@ func New(opts Options) *Forwarder {
func (f *Forwarder) Forward(ctx context.Context) error {
for f.opts.Iter.Next(ctx) {
elem := f.opts.Iter.Value()
if _, ok := f.sent[f.sentTuple(elem.From, elem.Msg)]; ok {
if _, ok := f.sent[f.sentTuple(elem.From(), elem.Msg())]; ok {
// skip grouped messages
continue
}
if _, ok := elem.Msg.GetGroupedID(); ok {
grouped, err := utils.Telegram.GetGroupedMessages(ctx, f.opts.Pool.Default(ctx), elem.From.InputPeer(), elem.Msg)
if _, ok := elem.Msg().GetGroupedID(); ok {
grouped, err := utils.Telegram.GetGroupedMessages(ctx, f.opts.Pool.Default(ctx), elem.From().InputPeer(), elem.Msg())
if err != nil {
continue
}
@ -95,22 +78,22 @@ func (f *Forwarder) Forward(ctx context.Context) error {
return f.opts.Iter.Err()
}
func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*tg.Message) (rerr error) {
func (f *Forwarder) forwardMessage(ctx context.Context, elem Elem, grouped ...*tg.Message) (rerr error) {
f.opts.Progress.OnAdd(elem)
defer func() {
f.sent[f.sentTuple(elem.From, elem.Msg)] = struct{}{}
f.sent[f.sentTuple(elem.From(), elem.Msg())] = struct{}{}
// grouped message also should be marked as sent
for _, m := range grouped {
f.sent[f.sentTuple(elem.From, m)] = struct{}{}
f.sent[f.sentTuple(elem.From(), m)] = struct{}{}
}
f.opts.Progress.OnDone(elem, rerr)
}()
log := logger.From(ctx).With(
zap.Int64("from", elem.From.ID()),
zap.Int64("to", elem.To.ID()),
zap.Int("message", elem.Msg.ID))
zap.Int64("from", elem.From().ID()),
zap.Int64("to", elem.To().ID()),
zap.Int("message", elem.Msg().ID))
forwardTextOnly := func(msg *tg.Message) error {
if msg.Message == "" {
@ -118,12 +101,12 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
}
req := &tg.MessagesSendMessageRequest{
NoWebpage: false,
Silent: elem.Silent,
Silent: elem.AsSilent(),
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To.InputPeer(),
Peer: elem.To().InputPeer(),
ReplyTo: nil,
Message: msg.Message,
RandomID: f.rand.Int63(),
@ -152,7 +135,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
// we should clone photo and document via re-upload, it will be banned if we forward it directly.
// but other media can be forwarded directly via copy
if (!protectedDialog(elem.From) && !protectedMessage(msg)) || !photoOrDocument(msg.Media) {
if (!protectedDialog(elem.From()) && !protectedMessage(msg)) || !photoOrDocument(msg.Media) {
media, ok := tmedia.ConvInputMedia(msg.Media)
if !ok {
return nil, errors.Errorf("can't convert message %d to input class directly", msg.ID)
@ -163,7 +146,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
media, ok := tmedia.GetMedia(msg)
if !ok {
log.Warn("Can't get media from message",
zap.Int64("peer", elem.From.ID()),
zap.Int64("peer", elem.From().ID()),
zap.Int("message", msg.ID))
// unsupported re-upload media
@ -177,7 +160,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
elem: elem,
progress: f.opts.Progress,
},
}, elem.DryRun)
}, elem.AsDryRun())
if err != nil {
return nil, errors.Wrap(err, "clone media")
}
@ -207,7 +190,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
Media: thumb,
PartSize: f.opts.PartSize,
Progress: nopProgress{},
}, elem.DryRun)
}, elem.AsDryRun())
if err != nil {
return nil, errors.Wrap(err, "clone thumb")
}
@ -231,13 +214,13 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
}
}
switch elem.Mode {
switch elem.Mode() {
case ModeDirect:
// it can be forwarded via API
if !protectedDialog(elem.From) && !protectedMessage(elem.Msg) {
if !protectedDialog(elem.From()) && !protectedMessage(elem.Msg()) {
builder := message.NewSender(f.forwardClient(ctx, elem)).
To(elem.To.InputPeer()).CloneBuilder()
if elem.Silent {
To(elem.To().InputPeer()).CloneBuilder()
if elem.AsSilent() {
builder = builder.Silent()
}
@ -247,14 +230,14 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
ids = append(ids, m.ID)
}
if _, err := builder.ForwardIDs(elem.From.InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil {
if _, err := builder.ForwardIDs(elem.From().InputPeer(), ids[0], ids[1:]...).Send(ctx); err != nil {
goto fallback
}
return nil
}
if _, err := builder.ForwardIDs(elem.From.InputPeer(), elem.Msg.ID).Send(ctx); err != nil {
if _, err := builder.ForwardIDs(elem.From().InputPeer(), elem.Msg().ID).Send(ctx); err != nil {
goto fallback
}
return nil
@ -283,12 +266,12 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
if len(media) > 0 {
req := &tg.MessagesSendMultiMediaRequest{
Silent: elem.Silent,
Silent: elem.AsSilent(),
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To.InputPeer(),
Peer: elem.To().InputPeer(),
ReplyTo: nil,
MultiMedia: media,
ScheduleDate: 0,
@ -301,28 +284,28 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
return nil
}
return forwardTextOnly(elem.Msg)
return forwardTextOnly(elem.Msg())
}
media, err := convForwardedMedia(elem.Msg)
media, err := convForwardedMedia(elem.Msg())
if err != nil {
log.Debug("Can't convert forwarded media", zap.Error(err))
return forwardTextOnly(elem.Msg)
return forwardTextOnly(elem.Msg())
}
// send text copy with forwarded media
req := &tg.MessagesSendMediaRequest{
Silent: elem.Silent,
Silent: elem.AsSilent(),
Background: false,
ClearDraft: false,
Noforwards: false,
UpdateStickersetsOrder: false,
Peer: elem.To.InputPeer(),
Peer: elem.To().InputPeer(),
ReplyTo: nil,
Media: media,
Message: elem.Msg.Message,
Message: elem.Msg().Message,
RandomID: rand.Int63(),
ReplyMarkup: elem.Msg.ReplyMarkup,
Entities: elem.Msg.Entities,
ReplyMarkup: elem.Msg().ReplyMarkup,
Entities: elem.Msg().Entities,
ScheduleDate: 0,
SendAs: nil,
}
@ -334,7 +317,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, elem *Elem, grouped ...*
return nil
}
return fmt.Errorf("unknown mode: %s", elem.Mode)
return errors.Errorf("unsupported mode %v", elem.Mode())
}
func (f *Forwarder) sentTuple(peer peers.Peer, msg *tg.Message) [2]int64 {
@ -347,8 +330,8 @@ func (n nopInvoker) Invoke(_ context.Context, _ bin.Encoder, _ bin.Decoder) erro
return nil
}
func (f *Forwarder) forwardClient(ctx context.Context, elem *Elem) *tg.Client {
if elem.DryRun {
func (f *Forwarder) forwardClient(ctx context.Context, elem Elem) *tg.Client {
if elem.AsDryRun() {
return tg.NewClient(nopInvoker{})
}

25
pkg/forwarder/iter.go Normal file
View File

@ -0,0 +1,25 @@
package forwarder
import (
"context"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
)
type Iter interface {
Next(ctx context.Context) bool
Value() Elem
Err() error
}
type Elem interface {
Mode() Mode
From() peers.Peer
Msg() *tg.Message
To() peers.Peer
AsSilent() bool
AsDryRun() bool
}

View File

@ -7,9 +7,9 @@ import (
)
type Progress interface {
OnAdd(elem *Elem)
OnClone(elem *Elem, state ProgressState)
OnDone(elem *Elem, err error)
OnAdd(elem Elem)
OnClone(elem Elem, state ProgressState)
OnDone(elem Elem, err error)
}
type ProgressState struct {
@ -18,7 +18,7 @@ type ProgressState struct {
}
type uploadProgress struct {
elem *Elem
elem Elem
progress Progress
}