Skip to content

Commit d7c730d

Browse files
authored
feat(webhook): add message webhook feature (#225)
1 parent aee9a53 commit d7c730d

File tree

12 files changed

+296
-32
lines changed

12 files changed

+296
-32
lines changed

internal/chatlog/conf/server.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ const (
55
)
66

77
type ServerConfig struct {
8-
Type string `mapstructure:"type"`
9-
Platform string `mapstructure:"platform"`
10-
Version int `mapstructure:"version"`
11-
FullVersion string `mapstructure:"full_version"`
12-
DataDir string `mapstructure:"data_dir"`
13-
DataKey string `mapstructure:"data_key"`
14-
ImgKey string `mapstructure:"img_key"`
15-
WorkDir string `mapstructure:"work_dir"`
16-
HTTPAddr string `mapstructure:"http_addr"`
17-
AutoDecrypt bool `mapstructure:"auto_decrypt"`
8+
Type string `mapstructure:"type"`
9+
Platform string `mapstructure:"platform"`
10+
Version int `mapstructure:"version"`
11+
FullVersion string `mapstructure:"full_version"`
12+
DataDir string `mapstructure:"data_dir"`
13+
DataKey string `mapstructure:"data_key"`
14+
ImgKey string `mapstructure:"img_key"`
15+
WorkDir string `mapstructure:"work_dir"`
16+
HTTPAddr string `mapstructure:"http_addr"`
17+
AutoDecrypt bool `mapstructure:"auto_decrypt"`
18+
Webhook *Webhook `mapstructure:"webhook"`
1819
}
1920

2021
var ServerDefaults = map[string]any{}
@@ -53,3 +54,7 @@ func (c *ServerConfig) GetHTTPAddr() string {
5354
}
5455
return c.HTTPAddr
5556
}
57+
58+
func (c *ServerConfig) GetWebhook() *Webhook {
59+
return c.Webhook
60+
}

internal/chatlog/conf/tui.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ type TUIConfig struct {
44
ConfigDir string `mapstructure:"-"`
55
LastAccount string `mapstructure:"last_account" json:"last_account"`
66
History []ProcessConfig `mapstructure:"history" json:"history"`
7+
Webhook *Webhook `mapstructure:"webhook" json:"webhook"`
78
}
89

910
var TUIDefaults = map[string]any{}

internal/chatlog/conf/webhook.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package conf
2+
3+
type Webhook struct {
4+
Host string `mapstructure:"host"`
5+
DelayMs int64 `mapstructure:"delay_ms"`
6+
Items []*WebhookItem `mapstructure:"items"`
7+
}
8+
9+
type WebhookItem struct {
10+
Type string `mapstructure:"type"`
11+
URL string `mapstructure:"url"`
12+
Talker string `mapstructure:"talker"`
13+
Sender string `mapstructure:"sender"`
14+
Keyword string `mapstructure:"keyword"`
15+
Disabled bool `mapstructure:"disabled"`
16+
}

internal/chatlog/ctx/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ func (c *Context) GetHTTPAddr() string {
181181
return c.HTTPAddr
182182
}
183183

184+
func (c *Context) GetWebhook() *conf.Webhook {
185+
return c.conf.Webhook
186+
}
187+
184188
func (c *Context) SetHTTPEnabled(enabled bool) {
185189
c.mu.Lock()
186190
defer c.mu.Unlock()

internal/chatlog/database/service.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package database
22

33
import (
4+
"context"
45
"time"
56

7+
"github.com/rs/zerolog/log"
8+
9+
"github.com/sjzar/chatlog/internal/chatlog/conf"
10+
"github.com/sjzar/chatlog/internal/chatlog/webhook"
611
"github.com/sjzar/chatlog/internal/model"
712
"github.com/sjzar/chatlog/internal/wechatdb"
813
)
@@ -15,21 +20,25 @@ const (
1520
)
1621

1722
type Service struct {
18-
State int
19-
StateMsg string
20-
conf Config
21-
db *wechatdb.DB
23+
State int
24+
StateMsg string
25+
conf Config
26+
db *wechatdb.DB
27+
webhook *webhook.Service
28+
webhookCancel context.CancelFunc
2229
}
2330

2431
type Config interface {
2532
GetWorkDir() string
2633
GetPlatform() string
2734
GetVersion() int
35+
GetWebhook() *conf.Webhook
2836
}
2937

3038
func NewService(conf Config) *Service {
3139
return &Service{
32-
conf: conf,
40+
conf: conf,
41+
webhook: webhook.New(conf),
3342
}
3443
}
3544

@@ -40,6 +49,7 @@ func (s *Service) Start() error {
4049
}
4150
s.SetReady()
4251
s.db = db
52+
s.initWebhook()
4353
return nil
4454
}
4555

@@ -49,6 +59,10 @@ func (s *Service) Stop() error {
4959
}
5060
s.SetInit()
5161
s.db = nil
62+
if s.webhookCancel != nil {
63+
s.webhookCancel()
64+
s.webhookCancel = nil
65+
}
5266
return nil
5367
}
5468

@@ -94,8 +108,28 @@ func (s *Service) GetMedia(_type string, key string) (*model.Media, error) {
94108
return s.db.GetMedia(_type, key)
95109
}
96110

111+
func (s *Service) initWebhook() error {
112+
if s.webhook == nil {
113+
return nil
114+
}
115+
ctx, cancel := context.WithCancel(context.Background())
116+
s.webhookCancel = cancel
117+
hooks := s.webhook.GetHooks(ctx, s.db)
118+
for _, hook := range hooks {
119+
if err := s.db.SetCallback(hook.Group(), hook.Callback); err != nil {
120+
log.Error().Err(err).Msgf("set callback %#v failed", hook)
121+
return err
122+
}
123+
}
124+
return nil
125+
}
126+
97127
// Close closes the database connection
98128
func (s *Service) Close() {
99129
// Add cleanup code if needed
100130
s.db.Close()
131+
if s.webhookCancel != nil {
132+
s.webhookCancel()
133+
s.webhookCancel = nil
134+
}
101135
}

internal/chatlog/webhook/webhook.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package webhook
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"net/http"
8+
"time"
9+
10+
"github.com/fsnotify/fsnotify"
11+
"github.com/rs/zerolog/log"
12+
13+
"github.com/sjzar/chatlog/internal/chatlog/conf"
14+
"github.com/sjzar/chatlog/internal/wechatdb"
15+
)
16+
17+
type Config interface {
18+
GetWebhook() *conf.Webhook
19+
}
20+
21+
type Webhook interface {
22+
Do(event fsnotify.Event)
23+
}
24+
25+
type Service struct {
26+
config *conf.Webhook
27+
hooks map[string][]*conf.WebhookItem
28+
}
29+
30+
func New(config Config) *Service {
31+
s := &Service{
32+
config: config.GetWebhook(),
33+
}
34+
35+
if s.config == nil {
36+
return s
37+
}
38+
39+
hooks := make(map[string][]*conf.WebhookItem)
40+
for _, item := range s.config.Items {
41+
if item.Disabled {
42+
continue
43+
}
44+
if item.Type == "" {
45+
item.Type = "message"
46+
}
47+
switch item.Type {
48+
case "message":
49+
if hooks["message"] == nil {
50+
hooks["message"] = make([]*conf.WebhookItem, 0)
51+
}
52+
hooks["message"] = append(hooks["message"], item)
53+
default:
54+
log.Error().Msgf("unknown webhook type: %s", item.Type)
55+
}
56+
}
57+
s.hooks = hooks
58+
59+
return s
60+
}
61+
62+
func (s *Service) GetHooks(ctx context.Context, db *wechatdb.DB) []*Group {
63+
64+
if len(s.hooks) == 0 {
65+
return nil
66+
}
67+
68+
groups := make([]*Group, 0)
69+
for group, items := range s.hooks {
70+
hooks := make([]Webhook, 0)
71+
for _, item := range items {
72+
hooks = append(hooks, NewMessageWebhook(item, db, s.config.Host))
73+
}
74+
groups = append(groups, NewGroup(ctx, group, hooks, s.config.DelayMs))
75+
}
76+
77+
return groups
78+
}
79+
80+
type Group struct {
81+
ctx context.Context
82+
group string
83+
hooks []Webhook
84+
delayMs int64
85+
ch chan fsnotify.Event
86+
}
87+
88+
func NewGroup(ctx context.Context, group string, hooks []Webhook, delayMs int64) *Group {
89+
g := &Group{
90+
group: group,
91+
hooks: hooks,
92+
delayMs: delayMs,
93+
ctx: ctx,
94+
ch: make(chan fsnotify.Event, 1),
95+
}
96+
go g.loop()
97+
return g
98+
}
99+
100+
func (g *Group) Callback(event fsnotify.Event) error {
101+
// skip remove event
102+
if !event.Op.Has(fsnotify.Create) {
103+
return nil
104+
}
105+
106+
select {
107+
case g.ch <- event:
108+
default:
109+
}
110+
return nil
111+
}
112+
113+
func (g *Group) Group() string {
114+
return g.group
115+
}
116+
117+
func (g *Group) loop() {
118+
for {
119+
select {
120+
case event, ok := <-g.ch:
121+
if !ok {
122+
return
123+
}
124+
if g.delayMs > 0 {
125+
time.Sleep(time.Duration(g.delayMs) * time.Millisecond)
126+
}
127+
g.do(event)
128+
case <-g.ctx.Done():
129+
return
130+
}
131+
}
132+
}
133+
134+
func (g *Group) do(event fsnotify.Event) {
135+
for _, hook := range g.hooks {
136+
go hook.Do(event)
137+
}
138+
}
139+
140+
type MessageWebhook struct {
141+
host string
142+
conf *conf.WebhookItem
143+
client *http.Client
144+
db *wechatdb.DB
145+
lastTime time.Time
146+
}
147+
148+
func NewMessageWebhook(conf *conf.WebhookItem, db *wechatdb.DB, host string) *MessageWebhook {
149+
m := &MessageWebhook{
150+
host: host,
151+
conf: conf,
152+
client: &http.Client{Timeout: time.Second * 10},
153+
db: db,
154+
lastTime: time.Now(),
155+
}
156+
return m
157+
}
158+
159+
func (m *MessageWebhook) Do(event fsnotify.Event) {
160+
messages, err := m.db.GetMessages(m.lastTime, time.Now().Add(time.Minute*10), m.conf.Talker, m.conf.Sender, m.conf.Keyword, 0, 0)
161+
if err != nil {
162+
log.Error().Err(err).Msgf("get messages failed")
163+
return
164+
}
165+
166+
if len(messages) == 0 {
167+
return
168+
}
169+
170+
m.lastTime = messages[len(messages)-1].Time.Add(time.Second)
171+
172+
for _, message := range messages {
173+
message.SetContent("host", m.host)
174+
message.Content = message.PlainTextContent()
175+
}
176+
177+
ret := map[string]any{
178+
"talker": m.conf.Talker,
179+
"sender": m.conf.Sender,
180+
"keyword": m.conf.Keyword,
181+
"lastTime": m.lastTime.Format(time.DateTime),
182+
"length": len(messages),
183+
"messages": messages,
184+
}
185+
body, _ := json.Marshal(ret)
186+
req, _ := http.NewRequest("POST", m.conf.URL, bytes.NewBuffer(body))
187+
req.Header.Set("Content-Type", "application/json")
188+
189+
log.Debug().Msgf("post messages to %s, body: %s", m.conf.URL, string(body))
190+
resp, err := m.client.Do(req)
191+
if err != nil {
192+
log.Error().Err(err).Msgf("post messages failed")
193+
}
194+
defer resp.Body.Close()
195+
196+
if resp.StatusCode != http.StatusOK {
197+
log.Error().Msgf("post messages failed, status code: %d", resp.StatusCode)
198+
}
199+
}

internal/wechatdb/datasource/darwinv3/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ func New(path string) (*DataSource, error) {
109109
return ds, nil
110110
}
111111

112-
func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error {
113-
return ds.dbm.AddCallback(name, callback)
112+
func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error {
113+
return ds.dbm.AddCallback(group, callback)
114114
}
115115

116116
func (ds *DataSource) initMessageDbs() error {

internal/wechatdb/datasource/datasource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type DataSource interface {
3131
GetMedia(ctx context.Context, _type string, key string) (*model.Media, error)
3232

3333
// 设置回调函数
34-
SetCallback(name string, callback func(event fsnotify.Event) error) error
34+
SetCallback(group string, callback func(event fsnotify.Event) error) error
3535

3636
Close() error
3737
}

0 commit comments

Comments
 (0)