refactor(chat): use tRun to start session

This commit is contained in:
iyear 2023-12-11 10:47:29 +08:00
parent 3f65a374fb
commit 62077ff467
4 changed files with 264 additions and 280 deletions

View File

@ -10,16 +10,15 @@ import (
"github.com/antonmedv/expr"
"github.com/fatih/color"
"github.com/go-faster/jx"
"github.com/gotd/contrib/middleware/ratelimit"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/telegram/query"
"github.com/gotd/td/telegram/query/messages"
"github.com/gotd/td/tg"
"github.com/jedib0t/go-pretty/v6/progress"
"go.uber.org/multierr"
"golang.org/x/time/rate"
"github.com/iyear/tdl/app/internal/tgc"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
@ -29,11 +28,6 @@ import (
//go:generate go-enum --names --values --flag --nocase
const (
rateInterval = 550 * time.Millisecond
rateBucket = 2
)
type ExportOptions struct {
Type ExportType
Chat string
@ -60,12 +54,7 @@ type Message struct {
// ENUM(time, id, last)
type ExportType int
func Export(ctx context.Context, opts *ExportOptions) error {
c, kvd, err := tgc.NoLogin(ctx, ratelimit.New(rate.Every(rateInterval), rateBucket))
if err != nil {
return err
}
func Export(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ExportOptions) (rerr error) {
// only output available fields
if opts.Filter == "-" {
fg := texpr.NewFieldsGetter(nil)
@ -84,155 +73,153 @@ func Export(ctx context.Context, opts *ExportOptions) error {
return fmt.Errorf("failed to compile filter: %w", err)
}
return tgc.RunWithAuth(ctx, c, func(ctx context.Context) (rerr error) {
var peer peers.Peer
var peer peers.Peer
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
if opts.Chat == "" { // defaults to me(saved messages)
peer, err = manager.Self(ctx)
} else {
peer, err = utils.Telegram.GetInputPeer(ctx, manager, opts.Chat)
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
if opts.Chat == "" { // defaults to me(saved messages)
peer, err = manager.Self(ctx)
} else {
peer, err = utils.Telegram.GetInputPeer(ctx, manager, opts.Chat)
}
if err != nil {
return fmt.Errorf("failed to get peer: %w", err)
}
color.Yellow("WARN: Export only generates minimal JSON for tdl download, not for backup.")
color.Cyan("Occasional suspensions are due to Telegram rate limitations, please wait a moment.")
fmt.Println()
color.Blue("Type: %s | Input: %v", opts.Type, opts.Input)
pw := prog.New(progress.FormatNumber)
pw.SetUpdateFrequency(200 * time.Millisecond)
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.ETA = false
pw.Style().Visibility.Percentage = false
tracker := prog.AppendTracker(pw, progress.FormatNumber, fmt.Sprintf("%s-%d", peer.VisibleName(), peer.ID()), 0)
go pw.Render()
var q messages.Query
switch {
case opts.Thread != 0: // topic messages, reply messages
q = query.NewQuery(c.API()).Messages().GetReplies(peer.InputPeer()).MsgID(opts.Thread)
default: // history
q = query.NewQuery(c.API()).Messages().GetHistory(peer.InputPeer())
}
iter := messages.NewIterator(q, 100)
switch opts.Type {
case ExportTypeTime:
iter = iter.OffsetDate(opts.Input[1] + 1)
case ExportTypeId:
iter = iter.OffsetID(opts.Input[1] + 1) // #89: retain the last msg id
case ExportTypeLast:
}
f, err := os.Create(opts.Output)
if err != nil {
return err
}
defer multierr.AppendInvoke(&rerr, multierr.Close(f))
enc := jx.NewStreamingEncoder(f, 512)
defer multierr.AppendInvoke(&rerr, multierr.Close(enc))
// process thread is reply type and peer is broadcast channel,
// so we need to set discussion group id instead of broadcast id
id := peer.ID()
if p, ok := peer.(peers.Channel); opts.Thread != 0 && ok && p.IsBroadcast() {
bc, _ := p.ToBroadcast()
raw, err := bc.FullRaw(ctx)
if err != nil {
return fmt.Errorf("failed to get peer: %w", err)
return fmt.Errorf("failed to get broadcast full raw: %w", err)
}
color.Yellow("WARN: Export only generates minimal JSON for tdl download, not for backup.")
color.Cyan("Occasional suspensions are due to Telegram rate limitations, please wait a moment.")
fmt.Println()
color.Blue("Type: %s | Input: %v", opts.Type, opts.Input)
pw := prog.New(progress.FormatNumber)
pw.SetUpdateFrequency(200 * time.Millisecond)
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.ETA = false
pw.Style().Visibility.Percentage = false
tracker := prog.AppendTracker(pw, progress.FormatNumber, fmt.Sprintf("%s-%d", peer.VisibleName(), peer.ID()), 0)
go pw.Render()
var q messages.Query
switch {
case opts.Thread != 0: // topic messages, reply messages
q = query.NewQuery(c.API()).Messages().GetReplies(peer.InputPeer()).MsgID(opts.Thread)
default: // history
q = query.NewQuery(c.API()).Messages().GetHistory(peer.InputPeer())
if id, ok = raw.GetLinkedChatID(); !ok {
return fmt.Errorf("no linked group")
}
iter := messages.NewIterator(q, 100)
}
enc.ObjStart()
defer enc.ObjEnd()
enc.Field("id", func(e *jx.Encoder) { e.Int64(id) })
enc.FieldStart("messages")
enc.ArrStart()
defer enc.ArrEnd()
count := int64(0)
loop:
for iter.Next(ctx) {
msg := iter.Value()
switch opts.Type {
case ExportTypeTime:
iter = iter.OffsetDate(opts.Input[1] + 1)
if msg.Msg.GetDate() < opts.Input[0] {
break loop
}
case ExportTypeId:
iter = iter.OffsetID(opts.Input[1] + 1) // #89: retain the last msg id
if msg.Msg.GetID() < opts.Input[0] {
break loop
}
case ExportTypeLast:
if count >= int64(opts.Input[0]) {
break loop
}
}
f, err := os.Create(opts.Output)
m, ok := msg.Msg.(*tg.Message)
if !ok {
continue
}
// only get media messages
media, ok := tmedia.GetMedia(m)
if !ok && !opts.All {
continue
}
b, err := texpr.Run(filter, texpr.ConvertEnvMessage(m))
if err != nil {
return err
return fmt.Errorf("failed to run filter: %w", err)
}
defer multierr.AppendInvoke(&rerr, multierr.Close(f))
enc := jx.NewStreamingEncoder(f, 512)
defer multierr.AppendInvoke(&rerr, multierr.Close(enc))
// process thread is reply type and peer is broadcast channel,
// so we need to set discussion group id instead of broadcast id
id := peer.ID()
if p, ok := peer.(peers.Channel); opts.Thread != 0 && ok && p.IsBroadcast() {
bc, _ := p.ToBroadcast()
raw, err := bc.FullRaw(ctx)
if err != nil {
return fmt.Errorf("failed to get broadcast full raw: %w", err)
}
if id, ok = raw.GetLinkedChatID(); !ok {
return fmt.Errorf("no linked group")
}
if !b.(bool) { // filtered
continue
}
enc.ObjStart()
defer enc.ObjEnd()
enc.Field("id", func(e *jx.Encoder) { e.Int64(id) })
enc.FieldStart("messages")
enc.ArrStart()
defer enc.ArrEnd()
count := int64(0)
loop:
for iter.Next(ctx) {
msg := iter.Value()
switch opts.Type {
case ExportTypeTime:
if msg.Msg.GetDate() < opts.Input[0] {
break loop
}
case ExportTypeId:
if msg.Msg.GetID() < opts.Input[0] {
break loop
}
case ExportTypeLast:
if count >= int64(opts.Input[0]) {
break loop
}
}
m, ok := msg.Msg.(*tg.Message)
if !ok {
continue
}
// only get media messages
media, ok := tmedia.GetMedia(m)
if !ok && !opts.All {
continue
}
b, err := texpr.Run(filter, texpr.ConvertEnvMessage(m))
if err != nil {
return fmt.Errorf("failed to run filter: %w", err)
}
if !b.(bool) { // filtered
continue
}
fileName := ""
if media != nil { // #207
fileName = media.Name
}
t := &Message{
ID: m.ID,
Type: "message",
File: fileName,
}
if opts.WithContent {
t.Date = m.Date
t.Text = m.Message
}
if opts.Raw {
t.Raw = m
}
mb, err := json.Marshal(t)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
enc.Raw(mb)
count++
tracker.SetValue(count)
fileName := ""
if media != nil { // #207
fileName = media.Name
}
t := &Message{
ID: m.ID,
Type: "message",
File: fileName,
}
if opts.WithContent {
t.Date = m.Date
t.Text = m.Message
}
if opts.Raw {
t.Raw = m
}
if err = iter.Err(); err != nil {
return err
mb, err := json.Marshal(t)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
enc.Raw(mb)
tracker.MarkAsDone()
prog.Wait(ctx, pw)
return nil
})
count++
tracker.SetValue(count)
}
if err = iter.Err(); err != nil {
return err
}
tracker.MarkAsDone()
prog.Wait(ctx, pw)
return nil
}

View File

@ -6,19 +6,17 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/antonmedv/expr"
"github.com/gotd/contrib/middleware/ratelimit"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/message/peer"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/telegram/query"
"github.com/gotd/td/tg"
"github.com/mattn/go-runewidth"
"go.uber.org/zap"
"golang.org/x/time/rate"
"github.com/iyear/tdl/app/internal/tgc"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
@ -57,7 +55,7 @@ type ListOptions struct {
Filter string
}
func List(ctx context.Context, opts ListOptions) error {
func List(ctx context.Context, c *telegram.Client, kvd kv.KV, opts ListOptions) error {
log := logger.From(ctx)
// align output
@ -81,80 +79,73 @@ func List(ctx context.Context, opts ListOptions) error {
return fmt.Errorf("failed to compile filter: %w", err)
}
c, kvd, err := tgc.NoLogin(ctx, ratelimit.New(rate.Every(time.Millisecond*400), 2))
dialogs, err := query.GetDialogs(c.API()).BatchSize(100).Collect(ctx)
if err != nil {
return err
}
return tgc.RunWithAuth(ctx, c, func(ctx context.Context) error {
dialogs, err := query.GetDialogs(c.API()).BatchSize(100).Collect(ctx)
blocked, err := utils.Telegram.GetBlockedDialogs(ctx, c.API())
if err != nil {
return err
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
result := make([]*Dialog, 0, len(dialogs))
for _, d := range dialogs {
id := utils.Telegram.GetInputPeerID(d.Peer)
// we can update our access hash state if there is any new peer.
if err = applyPeers(ctx, manager, d.Entities, id); err != nil {
log.Warn("failed to apply peer updates", zap.Int64("id", id), zap.Error(err))
}
// filter blocked peers
if _, ok := blocked[id]; ok {
continue
}
var r *Dialog
switch t := d.Peer.(type) {
case *tg.InputPeerUser:
r = processUser(t.UserID, d.Entities)
case *tg.InputPeerChannel:
r = processChannel(ctx, c.API(), t.ChannelID, d.Entities)
case *tg.InputPeerChat:
r = processChat(t.ChatID, d.Entities)
}
// skip unsupported types
if r == nil {
continue
}
// filter
b, err := texpr.Run(filter, r)
if err != nil {
return err
return fmt.Errorf("failed to run filter: %w", err)
}
if !b.(bool) {
continue
}
blocked, err := utils.Telegram.GetBlockedDialogs(ctx, c.API())
result = append(result, r)
}
switch opts.Output {
case ListOutputTable:
printTable(result)
case ListOutputJson:
bytes, err := json.MarshalIndent(result, "", "\t")
if err != nil {
return err
return fmt.Errorf("marshal json: %w", err)
}
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
result := make([]*Dialog, 0, len(dialogs))
for _, d := range dialogs {
id := utils.Telegram.GetInputPeerID(d.Peer)
fmt.Println(string(bytes))
default:
return fmt.Errorf("unknown output: %s", opts.Output)
}
// we can update our access hash state if there is any new peer.
if err = applyPeers(ctx, manager, d.Entities, id); err != nil {
log.Warn("failed to apply peer updates", zap.Int64("id", id), zap.Error(err))
}
// filter blocked peers
if _, ok := blocked[id]; ok {
continue
}
var r *Dialog
switch t := d.Peer.(type) {
case *tg.InputPeerUser:
r = processUser(t.UserID, d.Entities)
case *tg.InputPeerChannel:
r = processChannel(ctx, c.API(), t.ChannelID, d.Entities)
case *tg.InputPeerChat:
r = processChat(t.ChatID, d.Entities)
}
// skip unsupported types
if r == nil {
continue
}
// filter
b, err := texpr.Run(filter, r)
if err != nil {
return fmt.Errorf("failed to run filter: %w", err)
}
if !b.(bool) {
continue
}
result = append(result, r)
}
switch opts.Output {
case ListOutputTable:
printTable(result)
case ListOutputJson:
bytes, err := json.MarshalIndent(result, "", "\t")
if err != nil {
return fmt.Errorf("marshal json: %w", err)
}
fmt.Println(string(bytes))
default:
return fmt.Errorf("unknown output: %s", opts.Output)
}
return nil
})
return nil
}
func printTable(result []*Dialog) {

View File

@ -10,16 +10,15 @@ import (
"github.com/fatih/color"
"github.com/go-faster/errors"
"github.com/go-faster/jx"
"github.com/gotd/contrib/middleware/ratelimit"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/telegram/query/channels/participants"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/jedib0t/go-pretty/v6/progress"
"go.uber.org/multierr"
"golang.org/x/time/rate"
"github.com/iyear/tdl/app/internal/tgc"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/utils"
@ -39,80 +38,73 @@ type User struct {
LastName string `json:"last_name"`
}
func Users(ctx context.Context, opts UsersOptions) error {
c, kvd, err := tgc.NoLogin(ctx, ratelimit.New(rate.Every(rateInterval), rateBucket))
func Users(ctx context.Context, c *telegram.Client, kvd kv.KV, opts UsersOptions) (rerr error) {
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
if opts.Chat == "" {
return fmt.Errorf("missing domain id")
}
peer, err := utils.Telegram.GetInputPeer(ctx, manager, opts.Chat)
if err != nil {
return fmt.Errorf("failed to get peer: %w", err)
}
ch, ok := peer.(peers.Channel)
if !ok {
return fmt.Errorf("invalid type of chat. channels/groups are supported only")
}
color.Cyan("Occasional suspensions are due to Telegram rate limitations, please wait a moment.")
fmt.Println()
f, err := os.Create(opts.Output)
if err != nil {
return err
}
defer multierr.AppendInvoke(&rerr, multierr.Close(f))
return tgc.RunWithAuth(ctx, c, func(ctx context.Context) (rerr error) {
manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(c.API())
if opts.Chat == "" {
return fmt.Errorf("missing domain id")
}
enc := jx.NewStreamingEncoder(f, 512)
defer multierr.AppendInvoke(&rerr, multierr.Close(enc))
peer, err := utils.Telegram.GetInputPeer(ctx, manager, opts.Chat)
if err != nil {
return fmt.Errorf("failed to get peer: %w", err)
}
enc.ObjStart()
defer enc.ObjEnd()
enc.Field("id", func(e *jx.Encoder) { e.Int64(peer.ID()) })
ch, ok := peer.(peers.Channel)
if !ok {
return fmt.Errorf("invalid type of chat. channels/groups are supported only")
}
pw := prog.New(progress.FormatNumber)
pw.SetUpdateFrequency(200 * time.Millisecond)
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.ETA = true
pw.Style().Visibility.Percentage = true
color.Cyan("Occasional suspensions are due to Telegram rate limitations, please wait a moment.")
fmt.Println()
go pw.Render()
f, err := os.Create(opts.Output)
if err != nil {
return err
}
defer multierr.AppendInvoke(&rerr, multierr.Close(f))
builder := func() *participants.GetParticipantsQueryBuilder {
return participants.NewQueryBuilder(c.API()).
GetParticipants(ch.InputChannel()).
BatchSize(100)
}
enc := jx.NewStreamingEncoder(f, 512)
defer multierr.AppendInvoke(&rerr, multierr.Close(enc))
fields := map[string]*participants.GetParticipantsQueryBuilder{
"users": builder(),
"admins": builder().Admins(),
"kicked": builder().Kicked(""),
"banned": builder().Banned(""),
"bots": builder().Bots(),
}
enc.ObjStart()
defer enc.ObjEnd()
enc.Field("id", func(e *jx.Encoder) { e.Int64(peer.ID()) })
pw := prog.New(progress.FormatNumber)
pw.SetUpdateFrequency(200 * time.Millisecond)
pw.Style().Visibility.TrackerOverall = false
pw.Style().Visibility.ETA = true
pw.Style().Visibility.Percentage = true
go pw.Render()
builder := func() *participants.GetParticipantsQueryBuilder {
return participants.NewQueryBuilder(c.API()).
GetParticipants(ch.InputChannel()).
BatchSize(100)
}
fields := map[string]*participants.GetParticipantsQueryBuilder{
"users": builder(),
"admins": builder().Admins(),
"kicked": builder().Kicked(""),
"banned": builder().Banned(""),
"bots": builder().Bots(),
}
for field, query := range fields {
iter := query.Iter()
if err = outputUsers(ctx, pw, peer, enc, field, iter, opts.Raw); err != nil {
// skip if we get CHAT_ADMIN_REQUIRED error, just export other fields
if tgerr.Is(err, tg.ErrChatAdminRequired) {
continue
}
return fmt.Errorf("failed to output %s: %w", field, err)
for field, query := range fields {
iter := query.Iter()
if err = outputUsers(ctx, pw, peer, enc, field, iter, opts.Raw); err != nil {
// skip if we get CHAT_ADMIN_REQUIRED error, just export other fields
if tgerr.Is(err, tg.ErrChatAdminRequired) {
continue
}
return fmt.Errorf("failed to output %s: %w", field, err)
}
}
prog.Wait(ctx, pw)
return nil
})
prog.Wait(ctx, pw)
return nil
}
func outputUsers(ctx context.Context,

View File

@ -1,16 +1,24 @@
package cmd
import (
"context"
"fmt"
"math"
"strings"
"time"
"github.com/gotd/contrib/middleware/ratelimit"
"github.com/gotd/td/telegram"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
"github.com/iyear/tdl/app/chat"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
)
var limiter = ratelimit.New(rate.Every(500*time.Millisecond), 2)
func NewChat() *cobra.Command {
cmd := &cobra.Command{
Use: "chat",
@ -29,7 +37,9 @@ func NewChatList() *cobra.Command {
Use: "ls",
Short: "List your chats",
RunE: func(cmd *cobra.Command, args []string) error {
return chat.List(logger.Named(cmd.Context(), "ls"), opts)
return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return chat.List(logger.Named(ctx, "ls"), c, kvd, opts)
}, limiter)
},
}
@ -72,7 +82,9 @@ func NewChatExport() *cobra.Command {
return fmt.Errorf("unknown export type: %s", opts.Type)
}
return chat.Export(logger.Named(cmd.Context(), "export"), &opts)
return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return chat.Export(logger.Named(ctx, "export"), c, kvd, opts)
}, limiter)
},
}
@ -125,7 +137,9 @@ func NewChatUsers() *cobra.Command {
Use: "users",
Short: "export users from (protected) channels",
RunE: func(cmd *cobra.Command, args []string) error {
return chat.Users(logger.Named(cmd.Context(), "users"), opts)
return tRun(cmd.Context(), false, func(ctx context.Context, c *telegram.Client, kvd kv.KV) error {
return chat.Users(logger.Named(ctx, "users"), c, kvd, opts)
}, limiter)
},
}