c# - IEventProcessor not reading from Event Hub -
i working through implementing event hub reader using eventprocessorhost , simple ieventprocessor implementation. have confirmed telemetry data being written event hub using paolo salvatori's excellent service bus explorer. have configured eventprocessorhost use storage account leases , checkpoints. can see event hub data files in storage account. problem seeing @ point ieventprocessor implementation not appear reading event hub.
i not receiving exceptions. test console app connecting storage account without issue. have noticed logging statement added constructor never being called, looks receiver never being created. feel missing simple. can me determine have missed? thank you!
ieventprocessor implementation:
namespace receiver { internal class simpleeventprocessor : ieventprocessor { private stopwatch _checkpointstopwatch; public simpleeventprocessor() { console.writeline("simpleeventprocessor created"); } #region implementation of ieventprocessor public task openasync(partitioncontext context) { console.writeline("simpleeventprocessor initialized. partition: '{0}', offset: '{1}", context.lease.partitionid, context.lease.offset); _checkpointstopwatch = new stopwatch(); _checkpointstopwatch.start(); return task.fromresult<object>(null); } public async task processeventsasync(partitioncontext context, ienumerable<eventdata> messages) { foreach (var data in messages.select(eventdata => encoding.utf8.getstring(eventdata.getbytes()))) { console.writeline("message received. partition: '{0}', data: '{1}'", context.lease.partitionid, data); } if (_checkpointstopwatch.elapsed > timespan.fromseconds(30)) { await context.checkpointasync(); _checkpointstopwatch.restart(); } } public async task closeasync(partitioncontext context, closereason reason) { console.writeline("processor shutting down. partition '{0}', reason: {1}", context.lease.partitionid, reason); if (reason == closereason.shutdown) { await context.checkpointasync(); } } #endregion } }
test console code:
namespace eventhubtestconsole { internal class program { private static void main(string[] args) { asyncpump.run((func<task>) mainasync); } private static async task mainasync() { const string eventhubconnectionstring = "endpoint=<eh endpoint>;sharedaccesskeyname=<key name>;sharedaccesskey=<key>"; const string eventhubname = "<event hub name>"; const string storageaccountname = "<storage account name>"; const string storageaccountkey = "<valid storage key>"; var storageconnectionstring = string.format("defaultendpointsprotocol=https;accountname={0};accountkey={1}", storageaccountname, storageaccountkey); console.writeline("connecting storage account connectionstring: {0}", storageconnectionstring); var eventprocessorhostname = guid.newguid().tostring(); var eventprocessorhost = new eventprocessorhost( eventprocessorhostname, eventhubname, eventhubconsumergroup.defaultgroupname, eventhubconnectionstring, storageconnectionstring); var epo = new eventprocessoroptions { maxbatchsize = 100, prefetchcount = 1, receivetimeout = timespan.fromseconds(20), initialoffsetprovider = (name) => datetime.now.adddays(-7) }; epo.exceptionreceived += onexceptionreceived; await eventprocessorhost.registereventprocessorasync<simpleeventprocessor>(epo); console.writeline("receiving. please enter stop worker."); console.readline(); } public static void onexceptionreceived(object sender, exceptionreceivedeventargs args) { console.writeline("event hub exception received: {0}", args.exception.message); } }
it looks problem value eventprocessoroptions.prefetchcount.
i changed code shown here (removing asyncpump , shutting down receivers cleanly). found registereventprocessorasync throws exception if prefetchcount less 10.
namespace eventhubtestconsole { internal class program { private static void main(string[] args) { const string eventhubconnectionstring = "endpoint=<eh endpoint>;sharedaccesskeyname=<key name>;sharedaccesskey=<key>"; const string eventhubname = "<event hub name>"; const string storageaccountname = "<storage account name>"; const string storageaccountkey = "<valid storage key>"; var storageconnectionstring = string.format("defaultendpointsprotocol=https;accountname={0};accountkey={1}", storageaccountname, storageaccountkey); console.writeline("connecting storage account connectionstring: {0}", storageconnectionstring); var eventprocessorhostname = guid.newguid().tostring(); var eventprocessorhost = new eventprocessorhost( eventprocessorhostname, eventhubname, eventhubconsumergroup.defaultgroupname, eventhubconnectionstring, storageconnectionstring); var epo = new eventprocessoroptions { maxbatchsize = 100, prefetchcount = 10, receivetimeout = timespan.fromseconds(20), initialoffsetprovider = (name) => datetime.now.adddays(-7) }; epo.exceptionreceived += onexceptionreceived; eventprocessorhost.registereventprocessorasync<simpleeventprocessor>(epo).wait(); console.writeline("receiving. please enter stop worker."); console.readline(); eventprocessorhost.unregistereventprocessorasync().wait(); } public static void onexceptionreceived(object sender, exceptionreceivedeventargs args) { console.writeline("event hub exception received: {0}", args.exception.message); } } }
Comments
Post a Comment