-
Notifications
You must be signed in to change notification settings - Fork 292
Expand file tree
/
Copy path哇哇APP.py
More file actions
273 lines (240 loc) · 9.46 KB
/
哇哇APP.py
File metadata and controls
273 lines (240 loc) · 9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
@header({
searchable: 1,
filterable: 1,
quickSearch: 1,
title: '哇哇APP',
lang: 'hipy'
})
"""
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import sys
import time
import uuid
from base64 import b64decode, b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from Crypto.Cipher import AES
from Crypto.Hash import SHA256, MD5
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host, self.appKey, self.rsakey = self.userinfo()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/types", headers=self.getheader()).json()
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序", }
filters = {}
classes = []
json_data = data['data']['list']
for item in json_data:
has_non_empty_field = False
jsontype_extend = item["type_extend"]
jsontype_extend['by'] = '按更新,按播放,按评分,按收藏'
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
sl = {'按更新': 'time', '按播放': 'hits', '按评分': 'score', '按收藏': 'store_num'}
value_array = [
{"n": value.strip(), "v": sl[value.strip()] if dkey == "by" else value.strip()}
for value in values
if value.strip() != ""
]
filters[str(item["type_id"])].append(
{"key": dkey, "name": dy[dkey], "value": value_array}
)
result = {"class": classes, "filters": filters}
return result
def homeVideoContent(self):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/vodPhbAll", headers=self.getheader()).json()
return {'list': data['data']['list'][0]['vod_list']}
def categoryContent(self, tid, pg, filter, extend):
params = {
"type": tid,
"class": extend.get('class', ''),
"lang": extend.get('lang', ''),
"area": extend.get('area', ''),
"year": extend.get('year', ''),
"by": extend.get('by', ''),
"page": pg,
"limit": "12"
}
data = self.fetch(f"{self.host}/api.php/zjv6.vod", headers=self.getheader(), params=params).json()
result = {}
result['list'] = data['data']['list']
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/detail?vod_id={ids[0]}&rel_limit=10",
headers=self.getheader()).json()
vod = data['data']
v, np = {'vod_play_from': [], 'vod_play_url': []}, {}
for i in vod['vod_play_list']:
n = i['player_info']['show']
np[n] = []
for j in i['urls']:
j['parse'] = i['player_info']['parse2']
nm = j.pop('name')
np[n].append(f"{nm}${self.e64(json.dumps(j))}")
for key, value in np.items():
v['vod_play_from'].append(key)
v['vod_play_url'].append('#'.join(value))
v['vod_play_from'] = '$$$'.join(v['vod_play_from'])
v['vod_play_url'] = '$$$'.join(v['vod_play_url'])
vod.update(v)
vod.pop('vod_play_list', None)
vod.pop('type', None)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.fetch(f"{self.host}/api.php/zjv6.vod?page={pg}&limit=20&wd={key}", headers=self.getheader()).json()
return {'list': data['data']['list'], 'page': pg}
def playerContent(self, flag, id, vipFlags):
ids = json.loads(self.d64(id))
target_url = ids['url']
try:
parse_str = ids.get('parse', '')
if parse_str:
parse_urls = parse_str.split(',')
result_url = self.try_all_parses(parse_urls, target_url)
if result_url:
return {
'parse': 0,
'url': result_url,
'header': {'User-Agent': 'dart:io'}
}
return {
'parse': 1,
'url': target_url,
'header': {'User-Agent': 'dart:io'}
}
except Exception as e:
print(e)
return {
'parse': 1,
'url': target_url,
'header': {'User-Agent': 'dart:io'}
}
def liveContent(self, url):
pass
def localProxy(self, param):
pass
def userinfo(self):
t = str(int(time.time() * 1000))
uid = self.generate_uid()
sign = self.md5(f"appKey=3bbf7348cf314874883a18d6b6fcf67a&uid={uid}&time={t}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36',
'Connection': 'Keep-Alive',
'appKey': '3bbf7348cf314874883a18d6b6fcf67a',
'uid': uid,
'time': t,
'sign': sign,
}
params = {
'access_token': '74d5879931b9774be10dee3d8c51008e',
}
response = self.fetch('https://gitee.com/api/v5/repos/aycapp/openapi/contents/wawaconf.txt', params=params,
headers=headers).json()
data = json.loads(self.decrypt(response['content']))
return data['baseUrl'], data['appKey'], data['appSecret']
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def generate_uid(self):
return uuid.uuid4().hex
def getheader(self):
t = str(int(time.time() * 1000))
uid = self.generate_uid()
sign = self.sign_message(f"appKey={self.appKey}&time={t}&uid={uid}")
headers = {
'User-Agent': 'okhttp/4.9.3',
'Connection': 'Keep-Alive',
'uid': uid,
'time': t,
'appKey': self.appKey,
'sign': sign,
}
return headers
def decrypt(self, encrypted_data):
key = b64decode('Crm4FXWkk5JItpYirFDpqg==')
cipher = AES.new(key, AES.MODE_ECB)
encrypted = bytes.fromhex(self.d64(encrypted_data))
decrypted = cipher.decrypt(encrypted)
unpadded = unpad(decrypted, AES.block_size)
return unpadded.decode('utf-8')
def sign_message(self, message):
private_key_str = f"-----BEGIN PRIVATE KEY-----\n{self.rsakey}\n-----END PRIVATE KEY-----"
private_key = RSA.import_key(private_key_str)
message_hash = SHA256.new(message.encode('utf-8'))
signature = pkcs1_15.new(private_key).sign(message_hash)
signature_b64 = b64encode(signature).decode('utf-8')
return signature_b64
def fetch_url(self, parse_url, target_url):
try:
response = self.fetch(f"{parse_url.replace('..', '.')}{target_url}",
headers={"user-agent": "okhttp/4.1.0/luob.app"}, timeout=5)
if response.status_code == 200:
try:
data = response.json()
result_url = data.get('url') or data.get('data', {}).get('url')
if result_url:
return result_url
except:
pass
return None
except:
return None
def try_all_parses(self, parse_urls, target_url):
with ThreadPoolExecutor(max_workers=(len(parse_urls))) as executor:
future_to_url = {
executor.submit(self.fetch_url, parse_url.strip(), target_url): parse_url
for parse_url in parse_urls if parse_url.strip()
}
for future in as_completed(future_to_url):
try:
result = future.result()
if result:
return result
except:
continue
return None