Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming robustness proposal #90

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"workbench.colorCustomizations": {
"activityBar.background": "#5A1219",
"titleBar.activeBackground": "#7E1A24",
"titleBar.activeForeground": "#FEFBFB"
}
}
143 changes: 89 additions & 54 deletions Mastonet/TimelineHttpStreaming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,74 +24,109 @@ public TimelineHttpStreaming(StreamingType type, string? param, string instance,
this.instance = instance;
}

public override async Task Start()
public override async Task Start(TimeSpan? timeout = null, bool restart = true)
{
string url = "https://" + instance;
switch (streamingType)
do
{
case StreamingType.User:
url += "/api/v1/streaming/user";
break;
case StreamingType.Public:
url += "/api/v1/streaming/public";
break;
case StreamingType.PublicLocal:
url += "/api/v1/streaming/public/local";
break;
case StreamingType.Hashtag:
url += "/api/v1/streaming/hashtag?tag=" + param;
break;
case StreamingType.HashtagLocal:
url += "/api/v1/streaming/hashtag/local?tag=" + param;
break;
case StreamingType.List:
url += "/api/v1/streaming/list?list=" + param;
break;
case StreamingType.Direct:
url += "/api/v1/streaming/direct";
break;
default:
throw new NotImplementedException();
}
string url = "https://" + instance;
switch (streamingType)
{
case StreamingType.User:
url += "/api/v1/streaming/user";
break;
case StreamingType.Public:
url += "/api/v1/streaming/public";
break;
case StreamingType.PublicLocal:
url += "/api/v1/streaming/public/local";
break;
case StreamingType.Hashtag:
url += "/api/v1/streaming/hashtag?tag=" + param;
break;
case StreamingType.HashtagLocal:
url += "/api/v1/streaming/hashtag/local?tag=" + param;
break;
case StreamingType.List:
url += "/api/v1/streaming/list?list=" + param;
break;
case StreamingType.Direct:
url += "/api/v1/streaming/direct";
break;
default:
throw new NotImplementedException();
}

using (var request = new HttpRequestMessage(HttpMethod.Get, url))
using (cts = new CancellationTokenSource())
{
request.Headers.Add("Authorization", "Bearer " + accessToken);
using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token))
DateTime lastReceivedValidLine = DateTime.Now;
try
{
var stream = await response.Content.ReadAsStreamAsync();
using (var reader = new StreamReader(stream))
using (var request = new HttpRequestMessage(HttpMethod.Get, url))
using (cts = new CancellationTokenSource())
{
string? eventName = null;
string? data = null;

while (true)
request.Headers.Add("Authorization", "Bearer " + accessToken);
using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token))
{
var line = await reader.ReadLineAsync();

if (string.IsNullOrEmpty(line) || line.StartsWith(":"))
var stream = await response.Content.ReadAsStreamAsync();
using (var reader = new StreamReader(stream))
{
eventName = data = null;
continue;
}
string? eventName = null;
string? data = null;

if (line.StartsWith("event: "))
{
eventName = line.Substring("event: ".Length).Trim();
}
else if (line.StartsWith("data: "))
{
data = line.Substring("data: ".Length);
if (eventName != null)
while (true)
{
SendEvent(eventName, data);
var line = await reader.ReadLineAsync();

if (string.IsNullOrEmpty(line))
{
// reader returned without a line because of BaseStream.ReadTimeout
var timedOut = timeout != null && lastReceivedValidLine.Add(timeout.Value) < DateTime.Now;
if (reader.EndOfStream || timedOut)
{
// it has been too long since a valid line
var timeoutDuration = DateTime.Now.Subtract(lastReceivedValidLine);
throw new TimeoutException($"TimelineHttpStreaming timed out after: {timeoutDuration.ToString()}");
}
else
{
// nothing to do here, we haven't timed out yet
eventName = data = null;
continue;
}
}

if (line.StartsWith(":"))
{
lastReceivedValidLine = DateTime.Now;
eventName = data = null;
continue;
}

if (line.StartsWith("event: "))
{
lastReceivedValidLine = DateTime.Now;
eventName = line.Substring("event: ".Length).Trim();
}
else if (line.StartsWith("data: "))
{
lastReceivedValidLine = DateTime.Now;
data = line.Substring("data: ".Length);
if (eventName != null)
{
SendEvent(eventName, data);
}
}
}
}
}
}
}
}
catch (TimeoutException)
{
if (!restart)
throw;
else
NotifyStreamRestarted(lastReceivedValidLine);
}
} while (restart);
}

public override void Stop()
Expand Down
12 changes: 11 additions & 1 deletion Mastonet/TimelineStreaming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ public abstract class TimelineStreaming
public event EventHandler<StreamFiltersChangedEventArgs>? OnFiltersChanged;
public event EventHandler<StreamConversationEvenTargs>? OnConversation;

public event Action<DateTime> OnStreamRestarted;

protected TimelineStreaming(StreamingType type, string? param, string? accessToken)
{
this.streamingType = type;
this.param = param;
this.accessToken = accessToken;
}

public abstract Task Start();
public abstract Task Start(TimeSpan? timeout = null, bool restart = true);
public abstract void Stop();

protected void NotifyStreamRestarted(DateTime lastKnownSuccess)
{
if (OnStreamRestarted != null)
{
OnStreamRestarted.Invoke(lastKnownSuccess);
}
}

protected void SendEvent(string eventName, string data)
{
switch (eventName)
Expand Down
53 changes: 39 additions & 14 deletions Mastonet/TimelineWebSocketStreaming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public TimelineWebSocketStreaming(StreamingType type, string? param, string inst
this.instanceGetter = instanceGetter;
}

public override async Task Start()
public override async Task Start(TimeSpan? timeout = null, bool restart = true)
{
var instance = await instanceGetter;
var url = instance?.Urls?.StreamingAPI;
Expand Down Expand Up @@ -66,31 +66,56 @@ public override async Task Start()
throw new NotImplementedException();
}

socket = new ClientWebSocket();
await socket.ConnectAsync(new Uri(url), CancellationToken.None);

byte[] buffer = new byte[receiveChunkSize];
MemoryStream ms = new MemoryStream();
while (socket != null)
var lastValidMessage = DateTime.Now;
var timedOut = false;
do
{
var result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
try
{
if (socket == null || socket.State != WebSocketState.Open || socket.CloseStatus != WebSocketCloseStatus.Empty)
{
if (socket != null) { socket.Dispose(); }
socket = new ClientWebSocket();
await socket.ConnectAsync(new Uri(url), CancellationToken.None);
}

ms.Write(buffer, 0, result.Count);
var result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

if (result.EndOfMessage)
{
var messageStr = Encoding.UTF8.GetString(ms.ToArray());
ms.Write(buffer, 0, result.Count);

var message = JsonConvert.DeserializeObject<TimelineMessage>(messageStr);
if (message != null)
if (result.EndOfMessage)
{
SendEvent(message.Event, message.Payload);
var messageStr = Encoding.UTF8.GetString(ms.ToArray());

var message = JsonConvert.DeserializeObject<TimelineMessage>(messageStr);
if (message != null)
{
lastValidMessage = DateTime.Now;
SendEvent(message.Event, message.Payload);
}
ms.Dispose();
ms = new MemoryStream();
}

ms.Dispose();
ms = new MemoryStream();
timedOut = timeout != null && lastValidMessage.Add(timeout.Value) < DateTime.Now;
if (timedOut)
{
var timeoutDuration = DateTime.Now.Subtract(lastValidMessage);
throw new TimeoutException($"TimelineWebSocketStreaming timed out after: {timeoutDuration.ToString()}");
}
} catch (TimeoutException)
{
if (!restart)
throw;
else
NotifyStreamRestarted(lastValidMessage);
}
}
while (restart);

ms.Dispose();

this.Stop();
Expand Down