1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
from base64 import b64encode
from dataclasses import dataclass
from irctokens import StatefulDecoder, build
from typing import Optional
import trio
@dataclass
class Config:
host: str
nick: str
password: str
port: int = 6667
leader: str = ','
user: str = 'columbo'
channels: Optional[list[str]] = None
realname: str = 'columbo-irc_client v0.1'
current_nick: Optional[str] = None
encoding: str = 'utf-8'
async def _sender(stream, config, inbox):
def prepare(line):
print(f'>{line}')
return f'{line.format()}\r\n'.encode(config.encoding)
config.current_nick = config.nick
preamble = [
('CAP', ['REQ', 'sasl']),
('USER', [config.user, '0', '*', config.realname]),
('NICK', [config.nick]),
]
for args in preamble:
await stream.send_all(prepare(build(*args)))
async with inbox:
async for line in inbox:
await stream.send_all(prepare(line))
async def _handle(config, channel, outbox, line):
print(f'<{line}')
if line.command == 'PING':
await outbox.send(build('PONG', [line.params[0]]))
elif line.command == 'CAP':
if line.params[1] == 'ACK' and line.params[2] == 'sasl':
await outbox.send(build('AUTHENTICATE', ['PLAIN']))
elif line.command == 'AUTHENTICATE':
if line.params[0] == '+':
creds = b64encode(f'\0{config.nick}\0{config.password}'.encode(config.encoding)).decode()
await outbox.send(build('AUTHENTICATE', [creds]))
elif line.command == '900': # RPL_LOGGEDIN
await outbox.send(build('CAP', ['END']))
elif line.command == '001': # RPL_WELCOME
config.current_nick = line.params[0]
if config.channels:
for channel in config.channels:
await outbox.send(build('JOIN', [channel]))
elif line.command == 'PRIVMSG':
await channel.send(((outbox, config), line))
async def _receiver(stream, config, channel, outbox):
decoder = StatefulDecoder()
async with channel, outbox:
async for data in stream:
for line in decoder.push(data):
await _handle(config, channel, outbox, line)
async def connect(config, channel):
stream = await trio.open_tcp_stream(config.host, config.port)
outbox_send, outbox_recv = trio.open_memory_channel(0)
async with stream:
async with trio.open_nursery() as nursery:
nursery.start_soon(_sender, stream, config, outbox_recv)
nursery.start_soon(_receiver, stream, config, channel, outbox_send)
|