Skip to content

Commit b3c0b6a

Browse files
author
Taois
committed
fix:修复range请求错误
1 parent 70cc648 commit b3c0b6a

File tree

5 files changed

+53
-33
lines changed

5 files changed

+53
-33
lines changed

mediaProxy/base/emitter.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,52 @@ package base
22

33
import (
44
"io"
5+
"sync"
56
)
67

78
type Emitter struct {
89
pipeReader *io.PipeReader
910
pipeWriter *io.PipeWriter
1011
closed bool
12+
mutex sync.RWMutex
1113
}
1214

1315
func (em *Emitter) IsClosed() bool {
16+
em.mutex.RLock()
17+
defer em.mutex.RUnlock()
1418
return em.closed
1519
}
1620

1721
func (em *Emitter) Read(b []byte) (int, error) {
18-
n, err := em.pipeReader.Read(b)
19-
if err != nil {
20-
em.Close()
21-
return 0, err
22+
em.mutex.RLock()
23+
if em.closed {
24+
em.mutex.RUnlock()
25+
return 0, io.EOF
2226
}
23-
return n, nil
27+
em.mutex.RUnlock()
28+
return em.pipeReader.Read(b)
2429
}
2530

2631
func (em *Emitter) Write(b []byte) (int, error) {
27-
n, err := em.pipeWriter.Write(b)
28-
if err != nil {
29-
em.Close()
30-
return 0, err
32+
em.mutex.RLock()
33+
if em.closed {
34+
em.mutex.RUnlock()
35+
return 0, io.ErrClosedPipe
3136
}
32-
return n, nil
37+
em.mutex.RUnlock()
38+
return em.pipeWriter.Write(b)
3339
}
3440

3541
func (em *Emitter) WriteString(s string) (int, error) {
3642
return em.Write([]byte(s))
3743
}
3844

3945
func (em *Emitter) Close() error {
46+
em.mutex.Lock()
47+
defer em.mutex.Unlock()
48+
if em.closed {
49+
return nil // 已经关闭,直接返回
50+
}
4051
em.closed = true
4152
em.pipeReader.Close()
4253
em.pipeWriter.Close()
1.06 KB
Binary file not shown.

mediaProxy/build/mediaProxy-linux

112 Bytes
Binary file not shown.
2.26 MB
Binary file not shown.

mediaProxy/proxy.go

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ func ConcurrentDownload(downloadUrl string, rangeStart int64, rangeEnd int64, fi
121121

122122
defer func() {
123123
p.ProxyStop()
124+
emitter.Close() // 确保在函数结束时关闭emitter
124125
p = nil
125126
}()
126127

@@ -129,7 +130,6 @@ func ConcurrentDownload(downloadUrl string, rangeStart int64, rangeEnd int64, fi
129130

130131
if len(buffer) == 0 {
131132
p.ProxyStop()
132-
emitter.Close()
133133
logrus.Debugf("ProxyRead执行失败")
134134
buffer = nil
135135
return
@@ -139,15 +139,13 @@ func ConcurrentDownload(downloadUrl string, rangeStart int64, rangeEnd int64, fi
139139

140140
if err != nil {
141141
p.ProxyStop()
142-
emitter.Close()
143142
logrus.Errorf("emitter写入失败, 错误: %+v", err)
144143
buffer = nil
145144
return
146145
}
147146

148147
if p.CurrentOffset >= rangeEnd {
149148
p.ProxyStop()
150-
emitter.Close()
151149
logrus.Debugf("所有服务已经完成大小: %+v", totalLength)
152150
buffer = nil
153151
return
@@ -456,10 +454,10 @@ func handleGetMethod(w http.ResponseWriter, req *http.Request) {
456454
var responseHeaders interface{}
457455
responseHeaders, found = mediaCache.Get(headersKey)
458456
if !found || curTime-lastModified > 60 {
459-
// 关闭 Idle 超时设置
460-
base.IdleConnTimeout = 0
461-
resp, err := base.RestyClient.
462-
SetTimeout(0).
457+
// 创建专用的客户端用于获取头信息,避免修改全局设置
458+
headClient := base.NewRestyClient()
459+
resp, err := headClient.
460+
SetTimeout(30*time.Second).
463461
SetRetryCount(3).
464462
SetCookieJar(jar).
465463
R().
@@ -635,30 +633,40 @@ func handleGetMethod(w http.ResponseWriter, req *http.Request) {
635633
} else {
636634
splitSize = int64(128 * 1024)
637635
}
636+
637+
// 设置正确的Range响应头
638638
responseHeaders.(http.Header).Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, contentSize))
639+
responseHeaders.(http.Header).Set("Content-Length", strconv.FormatInt(rangeEnd-rangeStart+1, 10))
640+
responseHeaders.(http.Header).Set("Accept-Ranges", "bytes")
641+
642+
// 先设置响应头,再开始数据传输
643+
for key, values := range responseHeaders.(http.Header) {
644+
if strings.EqualFold(strings.ToLower(key), "connection") || strings.EqualFold(strings.ToLower(key), "proxy-connection") {
645+
continue
646+
}
647+
w.Header().Set(key, strings.Join(values, ","))
648+
}
649+
w.Header().Set("Connection", "keep-alive")
650+
w.WriteHeader(statusCode) // 206 for partial content
639651

640652
rp, wp := io.Pipe()
641653
emitter := base.NewEmitter(rp, wp)
642654

643-
go ConcurrentDownload(url, rangeStart, rangeEnd, contentSize, splitSize, numTasks, emitter, req)
644-
io.Copy(pw, emitter)
645-
646655
defer func() {
647-
emitter.Close()
648-
logrus.Debugf("handleGetMethod emitter 已关闭-支持断点续传")
656+
if !emitter.IsClosed() {
657+
emitter.Close()
658+
logrus.Debugf("handleGetMethod emitter 已关闭-支持断点续传")
659+
}
649660
}()
650-
} else {
651-
statusCode = 200
652-
}
653661

654-
for key, values := range responseHeaders.(http.Header) {
655-
if strings.EqualFold(strings.ToLower(key), "connection") || strings.EqualFold(strings.ToLower(key), "proxy-connection") {
656-
continue
657-
}
658-
w.Header().Set(key, strings.Join(values, ","))
662+
go ConcurrentDownload(url, rangeStart, rangeEnd, contentSize, splitSize, numTasks, emitter, req)
663+
io.Copy(pw, emitter)
664+
} else {
665+
// Range超出文件大小,返回416错误
666+
statusCode = 416
667+
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", contentSize))
668+
w.WriteHeader(statusCode)
659669
}
660-
w.Header().Set("Connection", "keep-alive")
661-
w.WriteHeader(statusCode)
662670
}
663671
}
664672

@@ -818,7 +826,8 @@ func shouldFilterHeaderName(key string) bool {
818826
return false
819827
}
820828
key = strings.ToLower(key)
821-
return key == "range" || key == "host" || key == "http-client-ip" || key == "remote-addr" || key == "accept-encoding"
829+
// 移除对 range 头的过滤,允许 Range 请求正常转发
830+
return key == "host" || key == "http-client-ip" || key == "remote-addr" || key == "accept-encoding"
822831
}
823832

824833
func main() {

0 commit comments

Comments
 (0)