Seperated eventhub service from api service.

wip nbs flow.
Changed uploading a bit.
This commit is contained in:
Mark van der Wal 2020-03-25 21:32:28 +01:00
parent e638370ad4
commit 691be2185e
11 changed files with 151 additions and 81 deletions

View File

@ -23,12 +23,13 @@ namespace FarmmapsApi
return new DiscoveryCache(configuration.DiscoveryEndpointUrl, return new DiscoveryCache(configuration.DiscoveryEndpointUrl,
() => httpFactory.CreateClient()); () => httpFactory.CreateClient());
}) })
.AddSingleton<HttpClientSettings>()
.AddSingleton<FarmmapsEventHub>()
.AddTransient<OpenIdConnectService>() .AddTransient<OpenIdConnectService>()
.AddTransient<FarmmapsAuthenticationHandler>() .AddTransient<FarmmapsAuthenticationHandler>()
.AddHttpClient<FarmmapsApiService>() .AddHttpClient<FarmmapsApiService>()
.AddHttpMessageHandler<FarmmapsAuthenticationHandler>() .AddHttpMessageHandler<FarmmapsAuthenticationHandler>()
.Services; .Services;
;
} }
public static async Task PollTask(TimeSpan retryTime, Func<CancellationTokenSource, Task> callback) public static async Task PollTask(TimeSpan retryTime, Func<CancellationTokenSource, Task> callback)

View File

@ -10,12 +10,12 @@ namespace FarmmapsApi.HttpMessageHandlers
{ {
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{ {
if (!request.Headers.Authorization.Scheme.Equals(OidcConstants.AuthenticationSchemes.AuthorizationHeaderBearer) || if (!OidcConstants.AuthenticationSchemes.AuthorizationHeaderBearer.Equals(request.Headers?.Authorization.Scheme) ||
string.IsNullOrEmpty(request.Headers.Authorization.Parameter)) string.IsNullOrEmpty(request.Headers?.Authorization.Parameter))
{ {
return new HttpResponseMessage(HttpStatusCode.Unauthorized) return new HttpResponseMessage(HttpStatusCode.Unauthorized)
{ {
Content = new StringContent("You must authenticate before using the API") Content = new StringContent("You must authenticated before using the API")
}; };
} }
return await base.SendAsync(request, cancellationToken); return await base.SendAsync(request, cancellationToken);

View File

@ -0,0 +1,7 @@
namespace FarmmapsApi.Models
{
public class HttpClientSettings
{
public string BearerToken { get; set; }
}
}

View File

@ -0,0 +1,18 @@
using System;
using Google.Apis.Upload;
namespace FarmmapsApi.Models
{
public class UploadResults
{
public IUploadProgress Progress { get; }
public Uri Location { get; }
public UploadResults(IUploadProgress progress, Uri location)
{
Progress = progress;
Location = location;
}
}
}

View File

@ -8,6 +8,7 @@ namespace FarmmapsApi
public const string ITEMS_RESOURCE = "items/{0}"; public const string ITEMS_RESOURCE = "items/{0}";
public const string ITEMS_CREATE_RESOURCE = "items"; public const string ITEMS_CREATE_RESOURCE = "items";
public const string ITEMS_DOWNLOAD_RESOURCE = "items/{0}/download"; public const string ITEMS_DOWNLOAD_RESOURCE = "items/{0}/download";
public const string ITEMS_UPLOAD_RESOURCE = "api/v1/file";
public const string ITEMS_CHILDREN_RESOURCE = "items/{0}/children"; public const string ITEMS_CHILDREN_RESOURCE = "items/{0}/children";
public const string ITEMS_DELETE_RESOURCE = "items/delete"; public const string ITEMS_DELETE_RESOURCE = "items/delete";

View File

@ -13,9 +13,6 @@ using FarmmapsApi.Models;
using Google.Apis.Http; using Google.Apis.Http;
using Google.Apis.Upload; using Google.Apis.Upload;
using IdentityModel; using IdentityModel;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Winista.Mime; using Winista.Mime;
@ -24,23 +21,28 @@ namespace FarmmapsApi.Services
{ {
public class FarmmapsApiService public class FarmmapsApiService
{ {
private readonly Configuration _configuration;
private readonly HttpClientSettings _httpClientSettings;
private readonly HttpClient _httpClient; private readonly HttpClient _httpClient;
private readonly OpenIdConnectService _openIdConnectService; private readonly OpenIdConnectService _openIdConnectService;
private readonly Configuration _configuration;
private HubConnection _hubConnection; public FarmmapsApiService(HttpClientSettings httpClientSettings, HttpClient httpClient,
public event Action<EventMessage> EventCallback;
public FarmmapsApiService(HttpClient httpClient,
OpenIdConnectService openIdConnectService, Configuration configuration) OpenIdConnectService openIdConnectService, Configuration configuration)
{ {
_configuration = configuration;
_httpClientSettings = httpClientSettings;
_httpClient = httpClient; _httpClient = httpClient;
_openIdConnectService = openIdConnectService; _openIdConnectService = openIdConnectService;
_configuration = configuration;
_httpClient.BaseAddress = new Uri(configuration.Endpoint); _httpClient.BaseAddress = new Uri(configuration.Endpoint);
_httpClient.DefaultRequestHeaders.Add("Accept", MediaTypeNames.Application.Json); _httpClient.DefaultRequestHeaders.Add("Accept", MediaTypeNames.Application.Json);
if (!string.IsNullOrEmpty(_httpClientSettings.BearerToken))
{
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue(OidcConstants.AuthenticationSchemes.AuthorizationHeaderBearer,
_httpClientSettings.BearerToken);
}
} }
public async Task AuthenticateAsync() public async Task AuthenticateAsync()
@ -57,33 +59,10 @@ namespace FarmmapsApi.Services
if (token.IsError) if (token.IsError)
throw new AuthenticationException(token.Error); throw new AuthenticationException(token.Error);
_httpClientSettings.BearerToken = token.AccessToken;
_httpClient.DefaultRequestHeaders.Authorization = _httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue(OidcConstants.AuthenticationSchemes.AuthorizationHeaderBearer, new AuthenticationHeaderValue(OidcConstants.AuthenticationSchemes.AuthorizationHeaderBearer,
token.AccessToken); token.AccessToken);
await StartEventHub(token.AccessToken);
}
private async Task StartEventHub(string accessToken)
{
var eventEndpoint = $"{_configuration.Endpoint}/EventHub";
_hubConnection = new HubConnectionBuilder()
.WithUrl(eventEndpoint, HttpTransportType.WebSockets,
options => options.SkipNegotiation = true)
.ConfigureLogging(log => log.AddConsole())
.WithAutomaticReconnect()
.Build();
await _hubConnection.StartAsync();
await AuthenticateEventHub(accessToken);
_hubConnection.Reconnected += async s => await AuthenticateEventHub(accessToken);
_hubConnection.On("event", (EventMessage eventMessage) => EventCallback?.Invoke(eventMessage));
}
private async Task AuthenticateEventHub(string accessToken)
{
await _hubConnection.SendAsync("authenticate", accessToken);
} }
public async Task<string> GetCurrentUserCodeAsync() public async Task<string> GetCurrentUserCodeAsync()
@ -301,10 +280,11 @@ namespace FarmmapsApi.Services
/// </summary> /// </summary>
/// <param name="filePath"></param> /// <param name="filePath"></param>
/// <param name="parentItemCode"></param> /// <param name="parentItemCode"></param>
/// <param name="uploadCallback"></param> /// <param name="progressCallback"></param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="FileNotFoundException"></exception> /// <exception cref="FileNotFoundException"></exception>
public async Task<Uri> UploadFile(string filePath, string parentItemCode, Action<IUploadProgress> uploadCallback) public async Task<UploadResults> UploadFile(string filePath, string parentItemCode,
Action<IUploadProgress> progressCallback = null)
{ {
if (!File.Exists(filePath)) if (!File.Exists(filePath))
throw new FileNotFoundException($"File not found {filePath}"); throw new FileNotFoundException($"File not found {filePath}");
@ -321,16 +301,17 @@ namespace FarmmapsApi.Services
Size = uploadStream.Length Size = uploadStream.Length
}; };
Uri uploadIdentifierUri = null;
using var httpClient = CreateConfigurableHttpClient(_httpClient); using var httpClient = CreateConfigurableHttpClient(_httpClient);
var farmmapsUploader = new FarmmapsUploader(httpClient, uploadStream, request, mimeType.ToString()); var farmmapsUploader = new FarmmapsUploader(httpClient, uploadStream, request,
mimeType.ToString(), ResourceEndpoints.ITEMS_UPLOAD_RESOURCE);
farmmapsUploader.ProgressChanged += uploadCallback; Uri location = null;
farmmapsUploader.UploadSessionData += data => uploadIdentifierUri = data.UploadUri; farmmapsUploader.ProgressChanged += progressCallback;
farmmapsUploader.UploadSessionData += data => location = data.UploadUri;
await farmmapsUploader.UploadAsync(); var progress = await farmmapsUploader.UploadAsync();
return uploadIdentifierUri; return new UploadResults(progress, location);
} }
/// <summary> /// <summary>
@ -356,7 +337,8 @@ namespace FarmmapsApi.Services
}; };
using var httpClient = CreateConfigurableHttpClient(_httpClient); using var httpClient = CreateConfigurableHttpClient(_httpClient);
var farmmapsUploader = new FarmmapsUploader(httpClient, uploadStream, request, string.Empty); var farmmapsUploader = new FarmmapsUploader(httpClient, uploadStream,
request, string.Empty, ResourceEndpoints.ITEMS_UPLOAD_RESOURCE);
farmmapsUploader.ProgressChanged += uploadCallback; farmmapsUploader.ProgressChanged += uploadCallback;
await farmmapsUploader.ResumeAsync(location); await farmmapsUploader.ResumeAsync(location);
@ -366,12 +348,14 @@ namespace FarmmapsApi.Services
private ConfigurableHttpClient CreateConfigurableHttpClient(HttpClient parent) private ConfigurableHttpClient CreateConfigurableHttpClient(HttpClient parent)
{ {
var googleHttpClient = new HttpClientFactory().CreateHttpClient(new CreateHttpClientArgs {GZipEnabled = true, ApplicationName = "FarmMaps"}); var googleHttpClient = new HttpClientFactory().CreateHttpClient(new CreateHttpClientArgs
{GZipEnabled = true, ApplicationName = "FarmMaps"});
googleHttpClient.BaseAddress = parent.BaseAddress; googleHttpClient.BaseAddress = parent.BaseAddress;
googleHttpClient.DefaultRequestHeaders.Add("Accept", MediaTypeNames.Application.Json); googleHttpClient.DefaultRequestHeaders.Add("Accept", MediaTypeNames.Application.Json);
var authHeader = parent.DefaultRequestHeaders.Authorization; var authHeader = parent.DefaultRequestHeaders.Authorization;
googleHttpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(authHeader.Scheme, authHeader.Parameter); googleHttpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue(authHeader.Scheme, authHeader.Parameter);
return googleHttpClient; return googleHttpClient;
} }

View File

@ -0,0 +1,45 @@
using System;
using System.Threading.Tasks;
using FarmmapsApi.Models;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.Logging;
namespace FarmmapsApi.Services
{
public class FarmmapsEventHub
{
private readonly Configuration _configuration;
private readonly HttpClientSettings _httpClientSettings;
private HubConnection _hubConnection;
public event Action<EventMessage> EventCallback;
public FarmmapsEventHub(HttpClientSettings httpClientSettings, Configuration configuration)
{
_httpClientSettings = httpClientSettings;
_configuration = configuration;
}
public async Task StartEventHub()
{
if(_hubConnection != null)
throw new Exception("EventHub already started");
var eventEndpoint = $"{_configuration.Endpoint}/EventHub";
_hubConnection = new HubConnectionBuilder()
.WithUrl(eventEndpoint, HttpTransportType.WebSockets,
options => options.SkipNegotiation = true)
.ConfigureLogging(log => log.AddConsole())
.WithAutomaticReconnect()
.Build();
await _hubConnection.StartAsync();
await _hubConnection.SendAsync("authenticate", _httpClientSettings.BearerToken);
_hubConnection.Reconnected += async s => await _hubConnection.SendAsync("authenticate", _httpClientSettings.BearerToken);
_hubConnection.On("Event", (EventMessage eventMessage) => EventCallback?.Invoke(eventMessage));
}
}
}

View File

@ -55,7 +55,7 @@ namespace FarmmapsApi.Services
/// completed. /// completed.
/// Caller is responsible for closing the <paramref name="contentStream"/>. /// Caller is responsible for closing the <paramref name="contentStream"/>.
/// </remarks> /// </remarks>
public FarmmapsUploader(ConfigurableHttpClient httpClient, Stream contentStream, FileRequest body, string contentType) public FarmmapsUploader(ConfigurableHttpClient httpClient, Stream contentStream, FileRequest body, string contentType, string path)
: base(contentStream, : base(contentStream,
new ResumableUploadOptions new ResumableUploadOptions
{ {
@ -69,9 +69,12 @@ namespace FarmmapsApi.Services
_streamLength = ContentStream.CanSeek ? ContentStream.Length : UnknownSize; _streamLength = ContentStream.CanSeek ? ContentStream.Length : UnknownSize;
var twoMB = 2 * 0x100000;
ChunkSize = twoMB;
body.ChunkSize = ChunkSize; body.ChunkSize = ChunkSize;
Body = body; Body = body;
Path = "api/v1/file";
Path = path;
HttpClient = httpClient; HttpClient = httpClient;
HttpMethod = HttpConsts.Post; HttpMethod = HttpConsts.Post;
ContentType = contentType; ContentType = contentType;

View File

@ -5,6 +5,7 @@ namespace FarmmapsApiSamples
public const string USERINPUT_ITEMTYPE = "vnd.farmmaps.itemtype.user.input"; public const string USERINPUT_ITEMTYPE = "vnd.farmmaps.itemtype.user.input";
public const string GEOTIFF_ITEMTYPE = "vnd.farmmaps.itemtype.geotiff"; public const string GEOTIFF_ITEMTYPE = "vnd.farmmaps.itemtype.geotiff";
public const string CROPFIELD_ITEMTYPE = "vnd.farmmaps.itemtype.cropfield"; public const string CROPFIELD_ITEMTYPE = "vnd.farmmaps.itemtype.cropfield";
public const string SHAPE_PROCESSED_ITEMTYPE = "vnd.farmmaps.itemtype.shape.processed";
public const string VRANBS_TASK = "vnd.farmmaps.task.vranbs"; public const string VRANBS_TASK = "vnd.farmmaps.task.vranbs";
} }
} }

View File

@ -15,48 +15,59 @@ namespace FarmmapsApiSamples
{ {
private readonly ILogger<NbsApp> _logger; private readonly ILogger<NbsApp> _logger;
private readonly FarmmapsApiService _farmmapsApiService; private readonly FarmmapsApiService _farmmapsApiService;
private readonly FarmmapsEventHub _farmmapsEventHub;
private readonly NitrogenService _nitrogenService; private readonly NitrogenService _nitrogenService;
public NbsApp(ILogger<NbsApp> logger, FarmmapsApiService farmmapsApiService, public NbsApp(ILogger<NbsApp> logger, FarmmapsApiService farmmapsApiService,
NitrogenService nitrogenService) FarmmapsEventHub farmmapsEventHub, NitrogenService nitrogenService)
{ {
_logger = logger; _logger = logger;
_farmmapsApiService = farmmapsApiService; _farmmapsApiService = farmmapsApiService;
_farmmapsEventHub = farmmapsEventHub;
_nitrogenService = nitrogenService; _nitrogenService = nitrogenService;
farmmapsApiService.EventCallback += OnEvent; _farmmapsEventHub.EventCallback += OnEvent;
} }
private void OnEvent(EventMessage @event) private void OnEvent(EventMessage @event)
{ {
_logger.LogInformation(@event.EventType); // _logger.LogInformation(@event.EventType);
} }
public async Task RunAsync() public async Task RunAsync()
{ {
try try
{ {
_logger.LogInformation("NBS sample app started");
await _farmmapsApiService.AuthenticateAsync();
_logger.LogInformation("Authenticated client credentials");
var roots = await _farmmapsApiService.GetCurrentUserRootsAsync(); var roots = await _farmmapsApiService.GetCurrentUserRootsAsync();
// upload data to Uploaded // upload data to Uploaded
var uploadedRoot = roots.SingleOrDefault(r => r.Name == "Uploaded"); var uploadedRoot = roots.SingleOrDefault(r => r.Name == "Uploaded");
if (uploadedRoot != null) if (uploadedRoot != null)
{ {
await _farmmapsApiService.UploadFile(Path.Combine("Data", "Scan_1_20190605.zip"), uploadedRoot.Code, var dataPath = Path.Combine("Data", "Scan_1_20190605.zip");
progress => var result = await _farmmapsApiService.UploadFile(dataPath, uploadedRoot.Code,
{ progress => _logger.LogInformation($"Status: {progress.Status} - BytesSent: {progress.BytesSent}"));
_logger.LogInformation($"Status: {progress.Status} - BytesSent: {progress.BytesSent}");
if (progress.Status == UploadStatus.Failed) if (result.Progress.Status == UploadStatus.Failed)
_logger.LogError($"Uploading failed {progress.Exception.Message}"); {
}); _logger.LogError($"Uploading failed {result.Progress.Exception.Message}");
return;
}
// find file we need to use, we use delay as hack until events work correctly
// await Task.Delay(10000);
// var isariaDataFilter = JObject.Parse(@"");
// var uploadedFilesChildren = await
// _farmmapsApiService.GetItemChildrenAsync(uploadedRoot.Code, SHAPE_PROCESSED_ITEMTYPE);
// need to transform shape data to geotiff // need to transform shape data to geotiff
// var shapeToGeotiffRequest = new TaskRequest()
// {
// TaskType = "vnd.farmmaps.task.shapetogeotiff"
// };
// var taskCode = await _farmmapsApiService.QueueTaskAsync(, shapeToGeotiffRequest);
var myDriveRoot = roots.SingleOrDefault(r => r.Name == "My drive"); var myDriveRoot = roots.SingleOrDefault(r => r.Name == "My drive");
if (myDriveRoot != null) if (myDriveRoot != null)

View File

@ -1,10 +1,11 @@
using System.Threading.Tasks; using System;
using System.Threading.Tasks;
using FarmmapsApi; using FarmmapsApi;
using FarmmapsApi.Models; using FarmmapsApi.Models;
using FarmmapsApi.Services;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
namespace FarmmapsApiSamples namespace FarmmapsApiSamples
{ {
@ -20,17 +21,15 @@ namespace FarmmapsApiSamples
var serviceProvider = new ServiceCollection() var serviceProvider = new ServiceCollection()
.AddLogging(opts => opts .AddLogging(opts => opts
.AddConsole(c => .AddConsole()
{
c.IncludeScopes = false;
c.Format = ConsoleLoggerFormat.Default;
})
.AddFilter("System.Net.Http", LogLevel.Warning)) .AddFilter("System.Net.Http", LogLevel.Warning))
.AddFarmmapsServices(configuration) .AddFarmmapsServices(configuration)
.AddTransient<NitrogenService>() .AddTransient<NitrogenService>()
.AddSingleton<IApp, NbsApp>() .AddSingleton<IApp, NbsApp>()
.BuildServiceProvider(); .BuildServiceProvider();
await serviceProvider.GetService<FarmmapsApiService>().AuthenticateAsync();
await serviceProvider.GetService<FarmmapsEventHub>().StartEventHub();
await serviceProvider.GetService<IApp>().RunAsync(); await serviceProvider.GetService<IApp>().RunAsync();
} }
} }